Implementation
1. Basic DBG Implementation
Our code for the basic De Bruijn Graph is similar to the DBG implementation from the Data Analytics Practice Opportunity 2021/22 – De Bruijn Graph in Genome Assembly with Millipede Genomes Dataset. We modified the selection method for the starting vertices by only choosing the first k-characters.
class Node:
def __init__(self, label):
self.label = label
self.indegree = 0
self.outdegree = 0
def build_k_mer(reads: list[str], k: int) -> tuple[list, list]:
k_mers, starts = [], []
for read in reads:
k_mers += [read[i:i+k] for i in range(len(read)-k+1)]
starts.append(read[0:k-1])
return k_mers, starts
def debruijnize(kmers: list) -> tuple[dict, dict]:
nodes, edges = dict(), dict()
for kmer in kmers:
kmer1 = kmer[:-1]
kmer2 = kmer[1:]
if kmer1 not in nodes:
nodes[kmer1] = Node(kmer1)
nodes[kmer1].outdegree += 1
if kmer2 not in nodes:
nodes[kmer2] = Node(kmer2)
nodes[kmer2].indegree += 1
if kmer1 not in edges:
edges[kmer1] = []
edges[kmer1].append(kmer2)
return nodes, edges
def construct_eulerian_path(edges: dict[str: list[str]], starts: list[str]) -> str:
best_contig = ""
best_length = 0
for start in starts:
E = deepcopy(edges)
contig = start
current = start
while current in E:
next = E[current][0]
contig += next[-1]
del E[current][0]
if not E[current]:
del E[current]
current = next
if len(contig) > best_length:
best_contig = contig
best_length = len(contig)
return best_contig
def assembly(reads: list, k: int) -> tuple[str, dict, dict, int]:
k_mers, starts = build_k_mer(reads,k)
nodes, edges = debruijnize(k_mers)
trail = construct_eulerian_path(edges, starts)
return trail, nodes, edges, k
2. K-mer optimization
The best k-mer is chosen by running the DBG assembly multiple times with different k-mer sizes. First, the program run the DBG algorithm multiple times with kmer size from the range specified by the user, with an interval of 5. (e.g. 10, 15, 20, etc.) The k-mer size that generated the longest substring will be chosen as bestK. Then, the program run the DBG algorithm again with the k-mer size in the range [bestK – 5, bestK + 5). The k-mer size that produce the longest sequence will be chosen as the find best k-mer size.
3. Process-Based Parallelism
We speed up the k-mer size testing process by using pool.starmap
from the multiprocessing
module. pool.starmap
offers a convenient means of parallelizing the execution of the test_kmer
function across multiple kmer values. Thus, fully leverage multiple processors on a given machine to speed up the k-mer size testing process.
def single_kmer_testing(minK: int, maxK: int, step: int, reads:list):
test_range = list(range(minK, maxK, step))
with Pool() as pool:
results = pool.starmap(assembly, zip(repeat(reads), test_range))
max_length = -math.inf
best_id = -1
for i, result in enumerate(results):
print(f"K: {result[3]} length: {len(result[0])}")
if len(result[0]) >= max_length:
best_id = i
max_length = len(result[0])
return results[best_id]
def test_kmer(minK: int, maxK: int, reads:list):
"""testing kmer size in range [minK, minK]"""
start_time = time.time()
print("First test")
contigs, vertices, edges, bestK = single_kmer_testing(minK, maxK+1, 5, reads)
print("Second test")
minK = max(minK, bestK-5)
maxK = min(maxK, bestK+5)
contigs, vertices, edges, bestK = single_kmer_testing(minK, maxK, 1, reads)
print(f"Best Kmer size: {bestK}")
print(f'Time Taken: {time.time() - start_time}s')
return contigs, vertices, edges, bestK