Source code for prism.proofreading

import cleanlog
import numpy as np

import prism.util as util
from prism.hmm import HMMModel

logger = cleanlog.ColoredLogger('insillico-proofreading')
PSEUDO_COUNT = 1

[docs]def hmm_proba(base_pattern, target_patterns, num_base_pattern, hmm_params): l = [] for b in np.split(base_pattern, num_base_pattern): model = HMMModel(b, **hmm_params) l.append([model.proba(pattern) for pattern in target_patterns]) return np.array(l)
[docs]def proofread_given_n_template(header, patterns, hmm_params, n_t=2, num_iter=100): # Abbreviations. # # l_p: Length of patterns. # n_p: Number of patterns. # n_t: Number of base patterns. # t: Template pattern. # l: Likelihood. # wl: Weighted likelihood. # post: Posterior probabilities. l_p, n_p = len(patterns[0]), len(patterns) # Initially we choose totally random base patterns. # We hope that the EM algorithm will find optimal template pattern! t = util.random_pattern(len(patterns[0]), n_t) # The uniform prior will be used. prior = (np.ones(n_t) / n_t).reshape([n_t, 1]) prev_assignment = np.array([-1] * n_p) for i in range(num_iter): l = hmm_proba(t, patterns, n_t, hmm_params) # Posterior can be expressed as a proportion of weighted likelihoods (wl). wl = prior * l post = wl / wl.sum(axis=0) # Each pattern will be assigned to the template pattern with maximum posterior probability. assignment = np.argmax(post, axis=0) # If assignments converged, break the loop. if np.all(assignment == prev_assignment): break prev_assignment = assignment.copy() new_t = [] # If there are some base patterns with no assigned patterns, # add new base patterns. n_assigned_template = len(set(assignment)) if n_assigned_template != n_t: for _ in range(n_t - n_assigned_template): new_t.append(util.random_pattern(l_p, 1)) # Compute new base patterns based with majority voting of the patterns assigned to each of them. for j in set(assignment): new = np.array((patterns[assignment == j].mean(axis=0) >= 0.5).astype(np.int32)) new_t.append(new) t = np.array(new_t).flatten() prior = post.sum(axis=1).reshape([n_t, 1]) # If it's not the last iteration, add pseudocount to the prior. if i != num_iter - 1: prior = prior + PSEUDO_COUNT * (np.ones(n_t) / n_t).reshape([n_t, 1]) prior = prior / prior.sum() base_patterns = np.split(t, n_t) l = hmm_proba(t, patterns, n_t, hmm_params) wl = prior * l post = wl / wl.sum(axis=0) assignment = np.argmax(post, axis=0) sum_loglikelihood = 0 for i, base_pattern in enumerate(base_patterns): model = HMMModel(base_pattern, **hmm_params) for j in range(len(patterns)): if assignment[j] == i: sum_loglikelihood += np.log(model.proba(patterns[j])) # Number of parameters. k = n_t * l_p # Compute Bayesian Information Criterion for model selection. bic = np.log(n_p * l_p) * k - 2 * sum_loglikelihood error_corrected_patterns = np.array([base_patterns[a] for a in assignment]) return bic, header, error_corrected_patterns
[docs]def proofread(params, num_iter=100): header, patterns, hmm_params = params logger.debug(f"Processing {header.split(';')[0]}") len_pattern = len(patterns[0]) num_pattern = len(patterns) max_cluster = min(int(num_pattern / len_pattern), 10) if max_cluster == 0: _, header, error_corrected_patterns = proofread_given_n_template(header, patterns, hmm_params, 10, num_iter) return header, error_corrected_patterns _, header, error_corrected_patterns = min([proofread_given_n_template(header, patterns, hmm_params, n, num_iter) for n in range(1, max_cluster + 1)], key=lambda tup: tup[0]) return header, error_corrected_patterns