""" Class for reading SNPs.
"""
"""
BSD 3-Clause License
Copyright (c) 2019, Andrew Riha
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
import binascii
from copy import deepcopy
import gzip
import io
import logging
import os
import re
import warnings
import zipfile
import zlib
import numpy as np
import pandas as pd
logger = logging.getLogger(__name__)
NORMALIZED_DTYPES = {
"rsid": object,
"chrom": object,
"pos": np.uint32,
"genotype": object,
}
TWO_ALLELE_DTYPES = {
"rsid": object,
"chrom": object,
"pos": np.uint32,
"allele1": object,
"allele2": object,
}
[docs]
def get_empty_snps_dataframe():
"""Get empty dataframe normalized for usage with ``snps``.
Returns
-------
pd.DataFrame
"""
df = pd.DataFrame(columns=["rsid", "chrom", "pos", "genotype"])
df = df.astype(NORMALIZED_DTYPES)
df.set_index("rsid", inplace=True)
return df
[docs]
class Reader:
"""Class for reading and parsing raw data / genotype files."""
[docs]
def __init__(self, file="", only_detect_source=False, resources=None, rsids=()):
"""Initialize a `Reader`.
Parameters
----------
file : str or bytes
path to file to load or bytes to load
only_detect_source : bool
only detect the source of the data
resources : Resources
instance of Resources
rsids : tuple, optional
rsids to extract if loading a VCF file
"""
self._file = file
self._only_detect_source = only_detect_source
self._resources = resources
self._rsids = frozenset(rsids)
[docs]
def read(self):
"""Read and parse a raw data / genotype file.
Returns
-------
dict
dict with the following items:
snps (pandas.DataFrame)
dataframe of parsed SNPs
source (str)
detected source of SNPs
phased (bool)
flag indicating if SNPs are phased
"""
file = self._file
compression = "infer"
d = {
"snps": get_empty_snps_dataframe(),
"source": "",
"phased": False,
"build": 0,
}
# peek into files to determine the data format
if isinstance(file, str) and os.path.exists(file):
if ".zip" in file:
with zipfile.ZipFile(file) as z:
with z.open(z.namelist()[0], "r") as f:
first_line, comments, data = self._extract_comments(
f, decode=True
)
compression = "zip"
elif ".gz" in file:
with gzip.open(file, "rt") as f:
first_line, comments, data = self._extract_comments(f)
compression = "gzip"
else:
with open(file, "rb") as f:
first_line, comments, data, compression = self._handle_bytes_data(
f.read()
)
elif isinstance(file, bytes):
first_line, comments, data, compression = self._handle_bytes_data(file)
file = io.BytesIO(file)
else:
return d
if "23andMe" in first_line:
# some 23andMe files have separate alleles
if comments.endswith(
"# rsid\tchromosome\tposition\tallele1\tallele2\n"
) or comments.endswith(
"# rsid\tchromosome\tposition\tallele1\tallele2\r\n"
):
d = self.read_23andme(file, compression, joined=False)
# some 23andMe files have a combined genotype
elif comments.endswith(
"# rsid\tchromosome\tposition\tgenotype\n"
) or comments.endswith("# rsid\tchromosome\tposition\tgenotype\r\n"):
d = self.read_23andme(file, compression, joined=True)
# something we havent seen before and can't handle
else:
return d
elif "AncestryDNA" in first_line:
d = self.read_ancestry(file, compression)
elif first_line.startswith("RSID"):
d = self.read_ftdna(file, compression)
elif "famfinder" in first_line:
d = self.read_ftdna_famfinder(file, compression)
elif "MyHeritage" in first_line:
d = self.read_myheritage(file, compression)
elif "Living DNA" in first_line:
d = self.read_livingdna(file, compression)
elif "SNP Name\trsID" in first_line or "SNP.Name\tSample.ID" in first_line:
d = self.read_mapmygenome(file, compression, first_line)
elif "lineage" in first_line or "snps" in first_line:
d = self.read_snps_csv(file, comments, compression)
elif "rsid\tChromosome\tposition\tgenotype" == first_line.strip():
d = self.read_tellmegen(file, compression)
elif re.match("^#*[ \t]*rsid[, \t]*chr", first_line):
d = self.read_generic(file, compression)
elif re.match("^rs[0-9]*[, \t]{1}[1]", first_line):
d = self.read_generic(file, compression, skip=0)
elif "vcf" in comments.lower() or "##contig" in comments.lower():
d = self.read_vcf(file, compression, "vcf", self._rsids)
elif ("Genes for Good" in comments) | ("PLINK" in comments):
d = self.read_genes_for_good(file, compression)
elif "DNA.Land" in comments:
d = self.read_dnaland(file, compression)
elif first_line.startswith("[Header]"):
# Global Screening Array, includes SANO and CODIGO46
d = self.read_gsa(file, compression, comments)
elif "Circle" in first_line:
d = self.read_circledna(file, compression)
# detect build from comments if build was not already detected from `read` method
if not d["build"]:
d.update({"build": self._detect_build_from_comments(comments, d["source"])})
return d
@classmethod
def read_file(cls, file, only_detect_source, resources, rsids):
warnings.warn(
"This method will be removed in a future release.", DeprecationWarning
)
r = cls(file, only_detect_source, resources, rsids)
return r.read()
def _extract_comments(self, f, decode=False, include_data=False):
line = self._read_line(f, decode)
first_line = line
comments = ""
data = ""
if first_line.startswith("#"):
while line.startswith("#"):
comments += line
line = self._read_line(f, decode)
if include_data:
while line:
data += line
line = self._read_line(f, decode)
elif first_line.startswith("[Header]"):
while not line.startswith("[Data]"):
comments += line
line = self._read_line(f, decode)
# Ignore the [Data] row
line = self._read_line(f, decode)
if include_data:
while line:
data += line
line = self._read_line(f, decode)
if not data and include_data:
data = f.read()
if decode:
data = data.decode()
if not isinstance(f, zipfile.ZipExtFile):
f.seek(0)
return first_line, comments, data
def _detect_build_from_comments(self, comments, source):
# if its a VCF parse these properly
if source == "vcf":
for line in comments.split("\n"):
line = line.strip()
if not line:
# skip blanks
continue
assert line.startswith("#"), line
if not line.startswith("##"):
# skip comments but not preamble
continue
if "=" not in line:
# skip lines without key/value in
continue
line = line[2:].strip()
key = line[: line.index("=")]
value = line[line.index("=") + 1 :]
if key.lower() == "contig":
assert value.startswith("<"), value
assert value.endswith(">"), value
parts = value[1:-1].split(",")
for part in parts:
part_key = part[: part.index("=")]
part_value = part[part.index("=") + 1 :]
if part_key.lower() == "assembly":
if "36" in part_value:
return 36
elif "37" in part_value or "hg19" in part_value:
return 37
elif "38" in part_value:
return 38
elif part_key.lower() == "length":
if "249250621" == part_value:
return 37 # length of chromosome 1
elif "248956422" == part_value:
return 38 # length of chromosome 1
# couldn't find anything
return 0
else:
# not a vcf
if "build 36" in comments.lower():
return 36
elif "build 37" in comments.lower():
return 37
elif "build 38" in comments.lower():
return 38
# these can cause false positives
# elif "b37" in comments.lower():
# return 37
# elif "b38" in comments.lower():
# return 38
# elif "hg19" in comments.lower():
# return 37
# elif "hg38" in comments.lower():
# return 38
elif "grch38" in comments.lower():
return 38
elif "grch37" in comments.lower():
return 37
elif "249250621" in comments.lower():
return 37 # length of chromosome 1
elif "248956422" in comments.lower():
return 38 # length of chromosome 1
else:
return 0
def _handle_bytes_data(self, file, include_data=False):
compression = "infer"
if self.is_zip(file):
compression = "zip"
with zipfile.ZipFile(io.BytesIO(file)) as z:
namelist = z.namelist()
key = "GFG_filtered_unphased_genotypes_23andMe.txt"
key_search = [key in name for name in namelist]
if any(key_search):
filename = namelist[key_search.index(True)]
else:
filename = namelist[0]
with z.open(filename, "r") as f:
first_line, comments, data = self._extract_comments(
f, decode=True, include_data=include_data
)
elif self.is_gzip(file):
compression = "gzip"
with gzip.open(io.BytesIO(file), "rb") as f:
first_line, comments, data = self._extract_comments(
f, decode=True, include_data=include_data
)
else:
compression = None
file = io.BytesIO(file)
first_line, comments, data = self._extract_comments(
deepcopy(file), decode=True, include_data=include_data
)
file.seek(0)
return first_line, comments, data, compression
[docs]
@staticmethod
def is_zip(bytes_data):
"""Check whether or not a bytes_data file is a valid Zip file."""
return zipfile.is_zipfile(io.BytesIO(bytes_data))
[docs]
@staticmethod
def is_gzip(bytes_data):
"""Check whether or not a bytes_data file is a valid gzip file."""
return binascii.hexlify(bytes_data[:2]) == b"1f8b"
@staticmethod
def _read_line(f, decode):
if decode:
# https://stackoverflow.com/a/606199
return f.readline().decode("utf-8")
else:
return f.readline()
[docs]
def read_helper(self, source, parser):
"""Generic method to help read files.
Parameters
----------
source : str
name of data source
parser : func
parsing function, which returns a tuple with the following items:
0 (pandas.DataFrame)
dataframe of parsed SNPs (empty if only detecting source)
1 (bool), optional
flag indicating if SNPs are phased
2 (int), optional
detected build of SNPs
Returns
-------
dict
dict with the following items:
snps (pandas.DataFrame)
dataframe of parsed SNPs
source (str)
detected source of SNPs
phased (bool)
flag indicating if SNPs are phased
build (int)
detected build of SNPs
References
----------
1. Fluent Python by Luciano Ramalho (O'Reilly). Copyright 2015 Luciano Ramalho,
978-1-491-94600-8.
"""
phased = False
build = 0
if self._only_detect_source:
df = get_empty_snps_dataframe()
else:
df, *extra = parser()
if len(extra) == 1:
phased = extra[0]
elif len(extra) == 2:
phased = extra[0]
build = extra[1]
return {"snps": df, "source": source, "phased": phased, "build": build}
[docs]
def read_23andme(self, file, compression, joined=True):
"""Read and parse 23andMe file.
https://www.23andme.com
Parameters
----------
file : str
path to file
Returns
-------
dict
result of `read_helper`
"""
def parser():
if joined:
columnnames = ["rsid", "chrom", "pos", "genotype"]
else:
columnnames = ["rsid", "chrom", "pos", "allele1", "allele2"]
df = pd.read_csv(
file,
comment="#",
sep="\t",
na_values=["--", "-"],
names=columnnames,
compression=compression,
)
# turn number numbers into string numbers
df["chrom"] = df["chrom"].map(
{
"1": "1",
"2": "2",
"3": "3",
"4": "4",
"5": "5",
"6": "6",
"7": "7",
"8": "8",
"9": "9",
"10": "10",
"11": "11",
"12": "12",
"13": "13",
"14": "14",
"15": "15",
"16": "16",
"17": "17",
"18": "18",
"19": "19",
"20": "20",
"21": "21",
"22": "22",
1: "1",
2: "2",
3: "3",
4: "4",
5: "5",
6: "6",
7: "7",
8: "8",
9: "9",
10: "10",
11: "11",
12: "12",
13: "13",
14: "14",
15: "15",
16: "16",
17: "17",
18: "18",
19: "19",
20: "20",
21: "21",
22: "22",
"X": "X",
"Y": "Y",
"MT": "MT",
}
)
if not joined:
# stick separate alleles together
df["genotype"] = df["allele1"] + df["allele2"]
del df["allele1"]
del df["allele2"]
df = df.dropna(subset=["rsid", "chrom", "pos"])
df = df.astype(dtype=NORMALIZED_DTYPES)
df = df.set_index("rsid")
return (df,)
return self.read_helper("23andMe", parser)
[docs]
def read_ftdna(self, file, compression):
"""Read and parse Family Tree DNA (FTDNA) file.
https://www.familytreedna.com
Parameters
----------
file : str
path to file
Returns
-------
dict
result of `read_helper`
"""
def parser():
try:
df = pd.read_csv(
file,
skiprows=1,
na_values="--",
names=["rsid", "chrom", "pos", "genotype"],
index_col=0,
dtype=NORMALIZED_DTYPES,
compression=compression,
)
except ValueError:
# read files with second header for concatenated data
if isinstance(file, io.BytesIO):
file.seek(0)
(*data,) = self._handle_bytes_data(file.read(), include_data=True)
file.seek(0)
else:
with open(file, "rb") as f:
(*data,) = self._handle_bytes_data(f.read(), include_data=True)
# reconstruct file content from `_handle_bytes_data` results
lines = data[0] + data[2]
lines = [line.strip() for line in lines.split("\n")]
# find index of second header
second_header_idx = lines.index("RSID,CHROMOSOME,POSITION,RESULT", 1)
df = pd.read_csv(
file,
skiprows=[0, second_header_idx],
na_values="--",
names=["rsid", "chrom", "pos", "genotype"],
index_col=0,
dtype=NORMALIZED_DTYPES,
compression=compression,
)
except OSError:
# read concatenated gzip files with extra data
if isinstance(file, io.BytesIO):
file.seek(0)
data = file.getbuffer()
else:
with open(file, "rb") as f:
data = f.read()
# https://stackoverflow.com/q/4928560
# https://stackoverflow.com/a/37042747
decompressor = zlib.decompressobj(31)
# decompress data from first concatenated gzip file
data = decompressor.decompress(data)
# decompress data from second concatenated gzip file
additional_data = zlib.decompress(decompressor.unused_data, 31)
data += additional_data[33:] # skip over second header
new_file = io.BytesIO(data)
df = pd.read_csv(
new_file,
skiprows=1,
na_values="--",
names=["rsid", "chrom", "pos", "genotype"],
index_col=0,
dtype=NORMALIZED_DTYPES,
compression=None, # already decompressed
)
return (df,)
return self.read_helper("FTDNA", parser)
[docs]
def read_ftdna_famfinder(self, file, compression):
"""Read and parse Family Tree DNA (FTDNA) "famfinder" file.
https://www.familytreedna.com
Parameters
----------
file : str
path to file
Returns
-------
dict
result of `read_helper`
"""
def parser():
df = pd.read_csv(
file,
comment="#",
na_values="-",
names=["rsid", "chrom", "pos", "allele1", "allele2"],
index_col=0,
dtype=TWO_ALLELE_DTYPES,
compression=compression,
)
# create genotype column from allele columns
df["genotype"] = df["allele1"] + df["allele2"]
# delete allele columns
# http://stackoverflow.com/a/13485766
del df["allele1"]
del df["allele2"]
return (df,)
return self.read_helper("FTDNA", parser)
[docs]
def read_ancestry(self, file, compression):
"""Read and parse Ancestry.com file.
http://www.ancestry.com
Parameters
----------
file : str
path to file
Returns
-------
dict
result of `read_helper`
"""
def parser():
df = pd.read_csv(
file,
comment="#",
header=0,
engine="c",
sep=r"\s+",
# delim_whitespace=True, # https://stackoverflow.com/a/15026839
na_values=0,
names=["rsid", "chrom", "pos", "allele1", "allele2"],
index_col=0,
dtype=TWO_ALLELE_DTYPES,
compression=compression,
)
# create genotype column from allele columns
df["genotype"] = df["allele1"] + df["allele2"]
# delete allele columns
# http://stackoverflow.com/a/13485766
del df["allele1"]
del df["allele2"]
# https://redd.it/5y90un
df.iloc[np.where(df["chrom"] == "23")[0], 0] = "X"
df.iloc[np.where(df["chrom"] == "24")[0], 0] = "Y"
df.iloc[np.where(df["chrom"] == "25")[0], 0] = "PAR"
df.iloc[np.where(df["chrom"] == "26")[0], 0] = "MT"
return (df,)
return self.read_helper("AncestryDNA", parser)
[docs]
def read_myheritage(self, file, compression):
"""Read and parse MyHeritage file.
https://www.myheritage.com
Parameters
----------
file : str
path to file
Returns
-------
dict
result of `read_helper`
"""
def parser():
if isinstance(file, str):
with open(file, "rb") as f:
first_line, comments, data, comrpession = self._handle_bytes_data(
f.read(), include_data=True
)
else:
first_line, comments, data, compression = self._handle_bytes_data(
file.read(), include_data=True
)
file_string_in = io.StringIO(data)
file_string_out = io.StringIO()
for line in file_string_in:
# user the number of quotes in a line to tell old from new
if line.count('"') == 14:
# extra-quoted new variant file
# can all be stripped so pandas can read it normally
line = line.replace('"', "")
# take it apart and put it back together so it looks
# like the older MyHeritage files
line = '"' + '","'.join(line.strip().split(",")) + '"\n'
file_string_out.write(line)
return (
pd.read_csv(
io.StringIO(file_string_out.getvalue()),
comment="#",
header=0,
na_values="--",
names=["rsid", "chrom", "pos", "genotype"],
index_col=0,
dtype=NORMALIZED_DTYPES,
),
)
return self.read_helper("MyHeritage", parser)
[docs]
def read_livingdna(self, file, compression):
"""Read and parse LivingDNA file.
https://livingdna.com/
Parameters
----------
file : str
path to file
Returns
-------
dict
result of `read_helper`
"""
def parser():
return (
pd.read_csv(
file,
comment="#",
sep="\t",
na_values="--",
names=["rsid", "chrom", "pos", "genotype"],
index_col=0,
dtype=NORMALIZED_DTYPES,
compression=compression,
),
)
return self.read_helper("LivingDNA", parser)
[docs]
def read_mapmygenome(self, file, compression, header):
"""Read and parse Mapmygenome file.
https://mapmygenome.in
Parameters
----------
file : str
path to file
Returns
-------
dict
result of `read_helper`
"""
def parser():
def parse(rsid_col_name, rsid_col_idx):
return pd.read_csv(
file,
comment="#",
sep="\t",
na_values="--",
header=0,
index_col=rsid_col_idx,
dtype={
rsid_col_name: object,
"Chr": object,
"Position": np.uint32,
"Allele1...Top": object,
"Allele2...Top": object,
},
compression=compression,
)
if "rsID" in header:
df = parse("rsID", 1)
else:
df = parse("SNP.Name", 0)
# uses Illumina definition of "Plus" from https://emea.support.illumina.com/bulletins/2017/06/how-to-interpret-dna-strand-and-allele-information-for-infinium-.html
df["genotype"] = df["Allele1...Plus"] + df["Allele2...Plus"]
df.rename(columns={"Chr": "chrom", "Position": "pos"}, inplace=True)
df.index.name = "rsid"
df = df[["chrom", "pos", "genotype"]]
return (df,)
return self.read_helper("Mapmygenome", parser)
[docs]
def read_genes_for_good(self, file, compression):
"""Read and parse Genes For Good file.
https://genesforgood.sph.umich.edu/readme/readme1.2.txt
Parameters
----------
file : str
path to file
Returns
-------
dict
result of `read_helper`
"""
def parser():
return (
pd.read_csv(
file,
comment="#",
sep="\t",
na_values="--",
names=["rsid", "chrom", "pos", "genotype"],
index_col=0,
dtype=NORMALIZED_DTYPES,
compression=compression,
),
)
return self.read_helper("GenesForGood", parser)
def _read_gsa_helper(self, file, source):
def parser():
# read the comments so we get to the actual data
if isinstance(file, str):
try:
with open(file, "rb") as f:
_, _, data = self._extract_comments(
f, decode=True, include_data=True
)
except UnicodeDecodeError:
# compressed file on filesystem
with open(file, "rb") as f:
_, _, data, _ = self._handle_bytes_data(
f.read(), include_data=True
)
else:
_, _, data, _ = self._handle_bytes_data(file.read(), include_data=True)
# turn the data into a pandas dataframe for manipulation
df = pd.read_csv(
io.StringIO(data),
sep="\t",
engine="c",
dtype={
"Position": NORMALIZED_DTYPES["pos"],
"Chr": NORMALIZED_DTYPES["chrom"],
},
)
# reserve columns we want out
assert "rsid" not in df.columns
assert "chrom" not in df.columns
assert "pos" not in df.columns
assert "genotype" not in df.columns
# prefer the specified chromosome and position, in prsent
# this uses the gsa names, so needs to be done before rsid
if "Chr" in df.columns and "Position" in df.columns:
# put the chromosome in the right column with the right type
df["chrom"] = df["Chr"]
# put the position in the right column with the right type
df["pos"] = df["Position"]
else:
# use an external source to map snp names to chromosome and position
df = df.merge(
self._resources.get_gsa_chrpos(),
how="inner", # inner join, everything must have a position
left_on="SNP Name",
right_on="gsaname_chrpos",
suffixes=(None, "_gsa"),
)
# make sure its the right types
df["chrom"] = df["gsachr"].apply(str).astype(NORMALIZED_DTYPES["chrom"])
df["pos"] = df["gsapos"].astype(NORMALIZED_DTYPES["pos"])
# use the given rsid when avaliable, SNP Name when unavaliable
df["rsid"] = df["SNP Name"].astype(NORMALIZED_DTYPES["rsid"])
if "RsID" in df.columns:
df.loc[df["RsID"] != ".", "rsid"] = df.loc[df["rsid"] != ".", "RsID"]
else:
# if given RSIDs are not avaliable, then use the external mapping to turn
# SNP names into rsids where possible
df = df.merge(
self._resources.get_gsa_rsid(),
how="left", # left-hand join, gsa rsids may be NA
left_on="SNP Name",
right_on="gsaname_rsid",
suffixes=(None, "_gsa_rsid"),
)
df.loc[~pd.isna(df["gsaname_rsid"]), "rsid"] = df.loc[
~pd.isna(df["gsaname_rsid"]), "gsarsid"
]
# combine the alleles into genotype
# prefer Plus strand as that is forward reference
if "Allele1 - Plus" in df.columns and "Allele2 - Plus" in df.columns:
df["genotype"] = (df["Allele1 - Plus"] + df["Allele2 - Plus"]).astype(
NORMALIZED_DTYPES["genotype"]
)
elif (
"Allele1 - Forward" in df.columns and "Allele2 - Forward" in df.columns
):
# if strand is forward, need to take reverse complement of *some* rsids
# this is because it is Illumina forward, which is dbSNP strand, which
# is reverse reference for some RSIDs before dbSNP 151.
# load list of reversable rsids
dbsnp151 = self._resources.get_dbsnp_151_37_reverse()
# keep only the rsids
dbsnp151 = dbsnp151.filter(items=("dbsnp151revrsid",), axis=1)
# add it as an extra column
df = df.merge(
dbsnp151,
how="left",
left_on="rsid",
right_on="dbsnp151revrsid",
suffixes=(None, "_dbsnp151rev"),
)
# create plus strand columns from the forward alleles and flip them if appropriate
for i in (1, 2):
df[f"Allele{i} - Plus"] = df[f"Allele{i} - Forward"]
df.loc[
(df[f"Allele{i} - Forward"] == "A")
& (~pd.isna(df["dbsnp151revrsid"])),
f"Allele{i} - Plus",
] = "T"
df.loc[
(df[f"Allele{i} - Forward"] == "T")
& (~pd.isna(df["dbsnp151revrsid"])),
f"Allele{i} - Plus",
] = "A"
df.loc[
(df[f"Allele{i} - Forward"] == "C")
& (~pd.isna(df["dbsnp151revrsid"])),
f"Allele{i} - Plus",
] = "G"
df.loc[
(df[f"Allele{i} - Forward"] == "G")
& (~pd.isna(df["dbsnp151revrsid"])),
f"Allele{i} - Plus",
] = "C"
# create a genotype by combining the new plus columns
df["genotype"] = (df["Allele1 - Plus"] + df["Allele2 - Plus"]).astype(
NORMALIZED_DTYPES["genotype"]
)
else:
raise ValueError("No supported allele column")
# mark -- genotype as na
df.loc[df["genotype"] == "--", "genotype"] = np.nan
# keep only the columns we want
df = df.filter(items=("rsid", "chrom", "pos", "genotype"), axis=1)
# discard rows without values
df.dropna(subset=["rsid", "chrom", "pos"], inplace=True)
# reindex for the new identifiers
df.set_index(["rsid"], inplace=True)
return (df,)
return self.read_helper(source, parser)
[docs]
def read_tellmegen(self, file, compression):
"""Read and parse tellmeGen files.
https://www.tellmegen.com/
Parameters
----------
data : str
data string
Returns
-------
dict
result of `read_helper`
"""
def parser():
df = pd.read_csv(
file,
sep="\t",
skiprows=1,
na_values="--",
names=["rsid", "chrom", "pos", "genotype"],
dtype=NORMALIZED_DTYPES,
compression=compression,
)
# use the external mapping to turn
# SNP names into rsids where possible
df = df.merge(
self._resources.get_gsa_rsid(),
how="left", # left-hand join, gsa rsids may be NA
left_on="rsid",
right_on="gsaname_rsid",
suffixes=(None, "_gsa_rsid"),
)
df.loc[~pd.isna(df["gsaname_rsid"]), "rsid"] = df.loc[
~pd.isna(df["gsaname_rsid"]), "gsarsid"
]
# keep only the columns we want
df = df.filter(items=("rsid", "chrom", "pos", "genotype"), axis=1)
# reindex for the new identifiers
df.set_index(["rsid"], inplace=True)
return (df,)
return self.read_helper("tellmeGen", parser)
[docs]
def read_gsa(self, data_or_filename, compresion, comments):
"""Read and parse Illumina Global Screening Array files
Parameters
----------
data_or_filename : str or bytes
either the filename to read from or the bytes data itself
Returns
-------
dict
result of `read_helper`
"""
# pick the source
# ideally we want something more specific than GSA
if "SANO" in comments:
source = "Sano"
elif "CODIGO46" in comments:
source = "Codigo46"
else:
# default to generic global screening array
source = "GSA"
return self._read_gsa_helper(data_or_filename, source)
[docs]
def read_dnaland(self, file, compression):
"""Read and parse DNA.land files.
https://dna.land/
Parameters
----------
data : str
data string
Returns
-------
dict
result of `read_helper`
"""
def parser():
return (
pd.read_csv(
file,
comment="#",
sep="\t",
na_values="--",
names=["rsid", "chrom", "pos", "genotype"],
index_col=0,
dtype=NORMALIZED_DTYPES,
compression=compression,
),
)
return self.read_helper("DNA.Land", parser)
[docs]
def read_circledna(self, file, compression):
"""Read and parse CircleDNA file.
https://circledna.com/
Notes
-----
This method attempts to read and parse a whole exome file, optionally compressed
with gzip or zip. Some assumptions are made throughout this process:
* SNPs that are not annotated with an RSID are skipped
* Insertions and deletions are skipped
Parameters
----------
file : str or bytes
path to file or bytes to load
Returns
-------
dict
result of `read_helper`
"""
def parser():
rs_chunks = []
with pd.read_csv(
file,
comment="#",
sep="\t",
chunksize=10000,
names=["rsid", "chrom", "pos", "genotype"],
compression=compression,
) as reader:
for chunk in reader:
# filter for SNPs with rsids
tmp = chunk.loc[
(chunk.rsid.str.startswith("rs"))
& (chunk.genotype.str.len() == 3)
]
if len(tmp) > 0:
rs_chunks.append(tmp)
df = pd.concat(rs_chunks)
df.chrom = df.chrom.str[3:]
df.genotype = df.genotype.apply(lambda x: "".join(x.split("/")))
df = df.astype(NORMALIZED_DTYPES)
df.set_index("rsid", inplace=True)
return (df,)
return self.read_helper("CircleDNA", parser)
[docs]
def read_snps_csv(self, file, comments, compression):
"""Read and parse CSV file generated by ``snps``.
https://pypi.org/project/snps/
Parameters
----------
file : str or buffer
path to file or buffer to read
comments : str
comments at beginning of file
Returns
-------
dict
result of `read_helper`
"""
source = ""
phased = False
build = 0
comment_lines = comments.split("\n")
for comment1 in comment_lines:
if "Source(s):" in comment1:
source = comment1.split("Source(s):")[1].strip()
elif "Phased:" in comment1:
if comment1.split("Phased:")[1].strip() == "True":
phased = True
elif "Build:" in comment1:
temp = int(comment1.split("Build:")[1].strip())
for comment2 in comment_lines:
if "Build Detected:" in comment2:
# only assign build if it was detected
if comment2.split("Build Detected:")[1].strip() == "True":
build = temp
break
def parser():
def parse_csv(sep):
return pd.read_csv(
file,
sep=sep,
comment="#",
header=0,
na_values="--",
names=["rsid", "chrom", "pos", "genotype"],
index_col=0,
dtype=NORMALIZED_DTYPES,
compression=compression,
)
try:
return (parse_csv(","), phased)
except pd.errors.ParserError:
if isinstance(file, io.BufferedIOBase):
file.seek(0)
return (parse_csv("\t"), phased, build)
return self.read_helper(source, parser)
[docs]
def read_generic(self, file, compression, skip=1):
"""Read and parse generic CSV or TSV file.
Notes
-----
Assumes columns are 'rsid', 'chrom' / 'chromosome', 'pos' / 'position', and 'genotype';
values are comma separated; unreported genotypes are indicated by '--'; and one header row
precedes data. For example:
rsid,chromosome,position,genotype
rs1,1,1,AA
rs2,1,2,CC
rs3,1,3,--
Parameters
----------
file : str
path to file
Returns
-------
dict
result of `read_helper`
"""
def parser():
def parse(sep):
return pd.read_csv(
file,
sep=sep,
skiprows=skip,
na_values="--",
names=["rsid", "chrom", "pos", "genotype"],
index_col=0,
dtype=NORMALIZED_DTYPES,
compression=compression,
)
try:
df = parse(",")
except ValueError:
try:
if isinstance(file, io.BufferedIOBase):
file.seek(0)
df = parse("\t")
except ValueError:
if isinstance(file, io.BufferedIOBase):
file.seek(0)
df = pd.read_csv(
file,
sep=None,
na_values="--",
skiprows=skip,
engine="python",
names=["rsid", "chrom", "pos", "genotype"],
usecols=[0, 1, 2, 3],
index_col=0,
dtype=NORMALIZED_DTYPES,
compression=compression,
)
return (df,)
return self.read_helper("generic", parser)
[docs]
def read_vcf(self, file, compression, provider, rsids=()):
"""Read and parse VCF file.
Notes
-----
This method attempts to read and parse a VCF file or buffer, optionally
compressed with gzip. Some assumptions are made throughout this process:
* SNPs that are not annotated with an RSID are skipped
* If the VCF contains multiple samples, only the first sample is used to
lookup the genotype
* Insertions and deletions are skipped
* If a sample allele is not specified, the genotype is reported as NaN
* If a sample allele refers to a REF or ALT allele that is not specified,
the genotype is reported as NaN
Parameters
----------
file : str or bytes
path to file or bytes to load
rsids : tuple, optional
rsids to extract if loading a VCF file
Returns
-------
dict
result of `read_helper`
"""
def parser():
if not isinstance(file, io.BytesIO):
with open(file, "rb") as f:
df, phased = self._parse_vcf(f, rsids)
else:
df, phased = self._parse_vcf(file, rsids)
return (df, phased)
return self.read_helper(provider, parser)
def _parse_vcf(self, buffer, rsids):
rows = []
phased = True
first_four_bytes = buffer.read(4)
buffer.seek(0)
if self.is_gzip(first_four_bytes):
f = gzip.open(buffer)
else:
f = buffer
logged_multi_sample = False
with io.TextIOWrapper(io.BufferedReader(f)) as file:
for line in file:
line_strip = line.strip("\n")
# skip blank lines
if not line_strip:
continue
# skip comment lines
if line_strip.startswith("#"):
continue
rsid = line_strip.split("\t")[2]
# skip SNPs with missing rsIDs.
if rsid == ".":
continue
if rsids:
if rsid not in rsids:
continue
line_split = line_strip.split("\t")
# snps does not yet support multi-sample vcf.
if not logged_multi_sample and len(line_split) > 10:
logger.info("Multiple samples detected in the vcf file")
logged_multi_sample = True
ref = line_split[3]
alt = line_split[4]
if len(alt.split(",")) > 1 and alt.split(",")[1] == "<NON_REF>":
alt = alt.split(",")[0]
ref_alt = [ref] + alt.split(",")
# skip insertions and deletions
if sum(map(len, ref_alt)) > len(ref_alt):
continue
# GT (genotype) is usually the first sample-specific field
# | = diploid phased
# / = diploid unphased
# or haploid e.g. male sex chromosome
genotype = ""
zygote = line_split[9]
zygote = zygote.split(":")[0]
for z in zygote.replace("|", "/").split("/"):
if z == ".":
# missing genotype
genotype = np.nan
break
z = int(z)
if z >= len(ref_alt):
# invalid genotype number
genotype = np.nan
break
elif ref_alt[z] == ".":
# missing genotype in ref or alt
genotype = np.nan
break
else:
genotype = genotype + ref_alt[z]
if "/" in zygote and pd.notna(genotype):
phased = False
record_array = [
rsid,
f"{line_split[0]}".strip("chr"),
line_split[1],
genotype,
]
rows.append(record_array)
if len(rows) == 0:
phased = False
df = pd.DataFrame(rows, columns=["rsid", "chrom", "pos", "genotype"])
df = df.astype(NORMALIZED_DTYPES)
df.set_index("rsid", inplace=True, drop=True)
return (df, phased)