| from collections import deque, Counter
|
| import warnings
|
| import pandas as pd
|
| import numpy as np
|
| from xml.etree import ElementTree as ET
|
| import math
|
|
|
| BIOLOGICAL_PROCESS = 'GO:0008150'
|
| MOLECULAR_FUNCTION = 'GO:0003674'
|
| CELLULAR_COMPONENT = 'GO:0005575'
|
| FUNC_DICT = {
|
| 'cc': CELLULAR_COMPONENT,
|
| 'mf': MOLECULAR_FUNCTION,
|
| 'bp': BIOLOGICAL_PROCESS}
|
|
|
| NAMESPACES = {
|
| 'cc': 'cellular_component',
|
| 'mf': 'molecular_function',
|
| 'bp': 'biological_process'
|
| }
|
|
|
| EXP_CODES = set([
|
| 'EXP', 'IDA', 'IPI', 'IMP', 'IGI', 'IEP', 'TAS', 'IC',
|
| 'HTP', 'HDA', 'HMP', 'HGI', 'HEP'])
|
|
|
|
|
| CAFA_TARGETS = set([
|
| '287', '3702', '4577', '6239', '7227', '7955', '9606', '9823', '10090',
|
| '10116', '44689', '83333', '99287', '226900', '243273', '284812', '559292'])
|
|
|
|
|
| def is_cafa_target(org):
|
| return org in CAFA_TARGETS
|
|
|
|
|
| def is_exp_code(code):
|
| return code in EXP_CODES
|
|
|
|
|
| def get_goplus_defs(filename='data/definitions.txt'):
|
| plus_defs = {}
|
| with open(filename) as f:
|
| for line in f:
|
| line = line.strip()
|
| go_id, definition = line.split(': ')
|
| go_id = go_id.replace('_', ':')
|
| definition = definition.replace('_', ':')
|
| plus_defs[go_id] = set(definition.split(' and '))
|
| return plus_defs
|
|
|
|
|
| class Ontology(object):
|
|
|
| def __init__(self, filename='data/go.obo', with_rels=False):
|
| self.ont = self.load(filename, with_rels)
|
| self.ic = None
|
| self.ic_norm = 0.0
|
|
|
| def has_term(self, term_id):
|
| return term_id in self.ont
|
|
|
| def get_term(self, term_id):
|
| if self.has_term(term_id):
|
| return self.ont[term_id]
|
| return None
|
|
|
| def calculate_ic(self, annots):
|
| cnt = Counter()
|
| for x in annots:
|
| cnt.update(x)
|
| self.ic = {}
|
| for go_id, n in cnt.items():
|
| parents = self.get_parents(go_id)
|
| if len(parents) == 0:
|
| min_n = n
|
| else:
|
| min_n = min([cnt[x] for x in parents])
|
|
|
| self.ic[go_id] = math.log(min_n / n, 2)
|
| self.ic_norm = max(self.ic_norm, self.ic[go_id])
|
|
|
| def get_ic(self, go_id):
|
| if self.ic is None:
|
| raise Exception('Not yet calculated')
|
| if go_id not in self.ic:
|
| return 0.0
|
| return self.ic[go_id]
|
|
|
| def get_norm_ic(self, go_id):
|
| return self.get_ic(go_id) / self.ic_norm
|
|
|
| def load(self, filename, with_rels):
|
| ont = dict()
|
| obj = None
|
| with open(filename, 'r') as f:
|
| for line in f:
|
| line = line.strip()
|
| if not line:
|
| continue
|
| if line == '[Term]':
|
| if obj is not None:
|
| ont[obj['id']] = obj
|
| obj = dict()
|
| obj['is_a'] = list()
|
| obj['part_of'] = list()
|
| obj['regulates'] = list()
|
| obj['alt_ids'] = list()
|
| obj['is_obsolete'] = False
|
| continue
|
| elif line == '[Typedef]':
|
| if obj is not None:
|
| ont[obj['id']] = obj
|
| obj = None
|
| else:
|
| if obj is None:
|
| continue
|
| l = line.split(": ")
|
| if l[0] == 'id':
|
| obj['id'] = l[1]
|
| elif l[0] == 'alt_id':
|
| obj['alt_ids'].append(l[1])
|
| elif l[0] == 'namespace':
|
| obj['namespace'] = l[1]
|
| elif l[0] == 'is_a':
|
| obj['is_a'].append(l[1].split(' ! ')[0])
|
| elif with_rels and l[0] == 'relationship':
|
| it = l[1].split()
|
|
|
| obj['is_a'].append(it[1])
|
| elif l[0] == 'name':
|
| obj['name'] = l[1]
|
| elif l[0] == 'is_obsolete' and l[1] == 'true':
|
| obj['is_obsolete'] = True
|
| if obj is not None:
|
| ont[obj['id']] = obj
|
| for term_id in list(ont.keys()):
|
| for t_id in ont[term_id]['alt_ids']:
|
| ont[t_id] = ont[term_id]
|
| if ont[term_id]['is_obsolete']:
|
| del ont[term_id]
|
| for term_id, val in ont.items():
|
| if 'children' not in val:
|
| val['children'] = set()
|
| for p_id in val['is_a']:
|
| if p_id in ont:
|
| if 'children' not in ont[p_id]:
|
| ont[p_id]['children'] = set()
|
| ont[p_id]['children'].add(term_id)
|
|
|
| return ont
|
|
|
| def get_anchestors(self, term_id):
|
| if term_id not in self.ont:
|
| return set()
|
| term_set = set()
|
| q = deque()
|
| q.append(term_id)
|
| while (len(q) > 0):
|
| t_id = q.popleft()
|
| if t_id not in term_set:
|
| term_set.add(t_id)
|
| for parent_id in self.ont[t_id]['is_a']:
|
| if parent_id in self.ont:
|
| q.append(parent_id)
|
| return term_set
|
|
|
| def get_prop_terms(self, terms):
|
| prop_terms = set()
|
|
|
| for term_id in terms:
|
| prop_terms |= self.get_anchestors(term_id)
|
| return prop_terms
|
|
|
| def get_parents(self, term_id):
|
| if term_id not in self.ont:
|
| return set()
|
| term_set = set()
|
| for parent_id in self.ont[term_id]['is_a']:
|
| if parent_id in self.ont:
|
| term_set.add(parent_id)
|
| return term_set
|
|
|
| def get_namespace_terms(self, namespace):
|
| terms = set()
|
| for go_id, obj in self.ont.items():
|
| if obj['namespace'] == namespace:
|
| terms.add(go_id)
|
| return terms
|
|
|
| def get_namespace(self, term_id):
|
| return self.ont[term_id]['namespace']
|
|
|
| def get_term_set(self, term_id):
|
| if term_id not in self.ont:
|
| return set()
|
| term_set = set()
|
| q = deque()
|
| q.append(term_id)
|
| while len(q) > 0:
|
| t_id = q.popleft()
|
| if t_id not in term_set:
|
| term_set.add(t_id)
|
| for ch_id in self.ont[t_id]['children']:
|
| q.append(ch_id)
|
| return term_set
|
|
|
|
|
| def read_fasta(filename):
|
| seqs = list()
|
| info = list()
|
| seq = ''
|
| inf = ''
|
| with open(filename, 'r') as f:
|
| for line in f:
|
| line = line.strip()
|
| if line.startswith('>'):
|
| if seq != '':
|
| seqs.append(seq)
|
| info.append(inf)
|
| seq = ''
|
| inf = line[1:].split()[0]
|
| else:
|
| seq += line
|
| seqs.append(seq)
|
| info.append(inf)
|
| return info, seqs
|
|
|
|
|
| class DataGenerator(object):
|
|
|
| def __init__(self, batch_size, is_sparse=False):
|
| self.batch_size = batch_size
|
| self.is_sparse = is_sparse
|
|
|
| def fit(self, inputs, targets=None):
|
| self.start = 0
|
| self.inputs = inputs
|
| self.targets = targets
|
| if isinstance(self.inputs, tuple) or isinstance(self.inputs, list):
|
| self.size = self.inputs[0].shape[0]
|
| else:
|
| self.size = self.inputs.shape[0]
|
| self.has_targets = targets is not None
|
|
|
| def __next__(self):
|
| return self.next()
|
|
|
| def reset(self):
|
| self.start = 0
|
|
|
| def next(self):
|
| if self.start < self.size:
|
| batch_index = np.arange(
|
| self.start, min(self.size, self.start + self.batch_size))
|
| if isinstance(self.inputs, tuple) or isinstance(self.inputs, list):
|
| res_inputs = []
|
| for inp in self.inputs:
|
| if self.is_sparse:
|
| res_inputs.append(
|
| inp[batch_index, :].toarray())
|
| else:
|
| res_inputs.append(inp[batch_index, :])
|
| else:
|
| if self.is_sparse:
|
| res_inputs = self.inputs[batch_index, :].toarray()
|
| else:
|
| res_inputs = self.inputs[batch_index, :]
|
| self.start += self.batch_size
|
| if self.has_targets:
|
| if self.is_sparse:
|
| labels = self.targets[batch_index, :].toarray()
|
| else:
|
| labels = self.targets[batch_index, :]
|
| return (res_inputs, labels)
|
| return res_inputs
|
| else:
|
| self.reset()
|
| return self.next()
|
|
|
|
|
|
|