Implementation

Implementation

1. Basic DBG Implementation

Our code for the basic De Bruijn Graph is similar to the DBG implementation from the Data Analytics Practice Opportunity 2021/22 – De Bruijn Graph in Genome Assembly with Millipede Genomes Dataset. We modified the selection method for the starting vertices by only choosing the first k-characters.

class Node:
    def __init__(self, label):
        self.label = label
        self.indegree = 0
        self.outdegree = 0

def build_k_mer(reads: list[str], k: int) -> tuple[list, list]:
    k_mers, starts = [], []
    for read in reads:
        k_mers += [read[i:i+k] for i in range(len(read)-k+1)]
        starts.append(read[0:k-1])
    return k_mers, starts

def debruijnize(kmers: list) -> tuple[dict, dict]:
    nodes, edges = dict(), dict() 
    for kmer in kmers:
        kmer1 = kmer[:-1]
        kmer2 = kmer[1:]
        if kmer1 not in nodes:
            nodes[kmer1] = Node(kmer1)
        nodes[kmer1].outdegree += 1
        if kmer2 not in nodes:
            nodes[kmer2] = Node(kmer2)
        nodes[kmer2].indegree += 1

        if kmer1 not in edges:
            edges[kmer1] = []
        edges[kmer1].append(kmer2)

    return nodes, edges

def construct_eulerian_path(edges: dict[str: list[str]], starts: list[str]) -> str:
    best_contig = ""
    best_length = 0

    for start in starts:
        E = deepcopy(edges)
        contig = start
        current = start

        while current in E:
            next = E[current][0]
            contig += next[-1]
            del E[current][0]
            if not E[current]:
                del E[current]
            current = next

        if len(contig) > best_length:
            best_contig = contig
            best_length = len(contig)

    return best_contig 

def assembly(reads: list, k: int) -> tuple[str, dict, dict, int]:
    k_mers, starts = build_k_mer(reads,k)
    nodes, edges = debruijnize(k_mers)
    trail = construct_eulerian_path(edges, starts)

    return trail, nodes, edges, k 

2. K-mer optimization

The best k-mer is chosen by running the DBG assembly multiple times with different k-mer sizes. First, the program run the DBG algorithm multiple times with kmer size from the range specified by the user, with an interval of 5. (e.g. 10, 15, 20, etc.) The k-mer size that generated the longest substring will be chosen as bestK. Then, the program run the DBG algorithm again with the k-mer size in the range [bestK – 5, bestK + 5). The k-mer size that produce the longest sequence will be chosen as the find best k-mer size.

3. Process-Based Parallelism

We speed up the k-mer size testing process by using pool.starmap from the multiprocessing module. pool.starmap offers a convenient means of parallelizing the execution of the test_kmer function across multiple kmer values. Thus, fully leverage multiple processors on a given machine to speed up the k-mer size testing process.

def single_kmer_testing(minK: int, maxK: int, step: int, reads:list):
    test_range = list(range(minK, maxK, step))
    with Pool() as pool:
        results = pool.starmap(assembly, zip(repeat(reads), test_range))

    max_length = -math.inf
    best_id = -1
    for i, result in enumerate(results):
        print(f"K: {result[3]} length: {len(result[0])}")
        if len(result[0]) >= max_length:
            best_id = i
            max_length = len(result[0])

    return results[best_id] 

def test_kmer(minK: int, maxK: int, reads:list):
    """testing kmer size in range [minK, minK]""" 
    start_time = time.time()
    
    print("First test")
    contigs, vertices, edges, bestK = single_kmer_testing(minK, maxK+1, 5, reads)

    print("Second test")
    minK = max(minK, bestK-5)
    maxK = min(maxK, bestK+5)
    contigs, vertices, edges, bestK = single_kmer_testing(minK, maxK, 1, reads)

    print(f"Best Kmer size: {bestK}")
    print(f'Time Taken: {time.time() - start_time}s')

    return contigs, vertices, edges, bestK