-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmotif_discovery.py
More file actions
223 lines (175 loc) · 7.71 KB
/
Copy pathmotif_discovery.py
File metadata and controls
223 lines (175 loc) · 7.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
import numpy as np
from typing import List, Dict, Tuple
# --- Core Pattern Matching Functions ---
def exact_match_all_occurrences(sequence: str, pattern: str) -> List[int]:
"""
Finds all starting indices of an exact pattern in a sequence.
Args:
sequence: The DNA/RNA sequence (string) to search within.
pattern: The exact pattern (string) to find.
Returns:
A list of starting indices (0-based) where the pattern is found.
Time Complexity: O(L * M), where L is the sequence length and M is the
pattern length (for the naive Python slicing/comparison).
(Optimized algorithms like KMP achieve O(L + M)).
Space Complexity: O(L) in the worst case (if the pattern occurs at every position).
"""
if not sequence or not pattern:
return []
L = len(sequence)
M = len(pattern)
occurrences = []
# Slide the pattern across the sequence
for i in range(L - M + 1):
# Check for exact match
if sequence[i:i + M].upper() == pattern.upper():
occurrences.append(i)
return occurrences
def _calculate_mismatches(subsequence: str, pattern: str) -> int:
"""
Helper function to calculate the number of mismatches (Hamming distance)
between two equal-length strings.
"""
mismatches = 0
# Assumes lengths are equal (M)
for i in range(len(pattern)):
if subsequence[i].upper() != pattern[i].upper():
mismatches += 1
return mismatches
def fuzzy_match_all_occurrences(sequence: str, pattern: str, max_mismatches: int) -> List[Tuple[int, int]]:
"""
Finds all starting indices and mismatch counts of a pattern in a sequence
allowing up to max_mismatches (fuzzy matching).
Args:
sequence: The DNA/RNA sequence (string) to search within.
pattern: The pattern (string) to find.
max_mismatches: The maximum allowed number of differences.
Returns:
A list of tuples (starting_index, mismatch_count).
Time Complexity: O(L * M), where L is the sequence length and M is the
pattern length (due to sliding window and internal comparison).
Space Complexity: O(L) in the worst case.
"""
if not sequence or not pattern:
return []
L = len(sequence)
M = len(pattern)
occurrences = []
# Slide the pattern across the sequence
for i in range(L - M + 1):
subsequence = sequence[i:i + M]
mismatches = _calculate_mismatches(subsequence, pattern)
# Check if mismatches are within the allowed threshold
if mismatches <= max_mismatches:
occurrences.append((i, mismatches))
return occurrences
# --- Motif Conservation Analysis ---
class MotifConservationAnalyzer:
"""
Analyzes a set of aligned motifs to determine conservation.
Calculates the Profile Matrix, Consensus Sequence, and Information Content.
"""
def __init__(self, motifs: List[str]):
"""
Initializes the analyzer with a list of aligned motif sequences.
Args:
motifs: A list of motif strings (must be of equal length).
Raises:
ValueError: If motifs are not of equal length or the list is empty.
"""
if not motifs:
raise ValueError("Motif list cannot be empty.")
M = len(motifs[0])
if not all(len(m) == M for m in motifs):
raise ValueError("All motifs must be of the same length.")
self.motifs = [m.upper() for m in motifs]
self.num_motifs = len(motifs)
self.motif_length = M
self.bases = ['A', 'C', 'G', 'T']
self.profile_matrix = None
self.consensus_sequence = ""
self.information_content = 0.0
def _calculate_profile_matrix(self) -> None:
"""
Calculates the Position Frequency Matrix (PFM) and converts it to a
Position Probability Matrix (PPM) or Profile Matrix.
Time Complexity: O(N * M), where N is the number of motifs and M is the length.
Space Complexity: O(4 * M) for the matrix.
"""
M = self.motif_length
N = self.num_motifs
# PFM: 4 rows (A, C, G, T), M columns (positions)
pfm = np.zeros((len(self.bases), M))
base_to_index = {base: i for i, base in enumerate(self.bases)}
# Count occurrences at each position
for motif in self.motifs:
for i in range(M):
base = motif[i]
if base in base_to_index:
pfm[base_to_index[base], i] += 1
# PPM/Profile Matrix (Frequency matrix)
# Add a small pseudo-count (e.g., 0) or use the raw frequency
self.profile_matrix = pfm / N
def _calculate_consensus(self) -> str:
"""
Determines the consensus sequence by selecting the most frequent base
at each position based on the profile matrix.
"""
if self.profile_matrix is None:
self._calculate_profile_matrix()
consensus = []
# Find the index (base) with the maximum frequency at each column (position)
max_indices = np.argmax(self.profile_matrix, axis=0)
for index in max_indices:
consensus.append(self.bases[index])
self.consensus_sequence = "".join(consensus)
return self.consensus_sequence
def _calculate_information_content(self) -> float:
"""
Calculates the Total Information Content (Conservation Score) of the motif.
Information Content (IC) at position 'j' is:
IC(j) = 2 + sum(P(b, j) * log2(P(b, j))) for all bases 'b'
Total IC = sum(IC(j)) over all positions 'j'.
P(b, j) is the frequency of base 'b' at position 'j'.
Time Complexity: O(4 * M)
Space Complexity: O(1)
"""
if self.profile_matrix is None:
self._calculate_profile_matrix()
# Assuming background probability p(b) = 0.25 (p=1/4)
background_prob = 0.25
total_ic = 0.0
# Iterate over each position (column)
for j in range(self.motif_length):
ic_j = 0.0
# Iterate over each base (row)
for i in range(len(self.bases)):
P_b_j = self.profile_matrix[i, j] # Observed frequency
# Entropy calculation: P * log2(P)
# Avoid log(0) by checking for P_b_j > 0
if P_b_j > 0:
ic_j += P_b_j * np.log2(P_b_j)
# Conservation is 2 - Entropy (since max entropy is -2 for 4 bases)
# IC(j) = 2 - (-sum(P(b,j) * log2(P(b,j))))
ic_j = 2.0 + ic_j
total_ic += ic_j
self.information_content = round(total_ic, 4)
return self.information_content
def analyze(self) -> Dict:
"""
Performs the complete motif conservation analysis.
Returns:
A dictionary containing the profile matrix, consensus, and conservation score.
"""
self._calculate_profile_matrix()
self._calculate_consensus()
self._calculate_information_content()
# Format profile matrix for cleaner output
profile_dict = {base: self.profile_matrix[i].round(4).tolist()
for i, base in enumerate(self.bases)}
return {
'Profile_Matrix': profile_dict,
'Consensus_Sequence': self.consensus_sequence,
'Total_Information_Content': self.information_content,
'Interpretation': f"Score ranges from 0 (low conservation) to {self.motif_length * 2} (perfect conservation)."
}