Learn Huffman Encoding With Python

In this post I’m going to walk through an implementation of huffman encoding and decoding in Python. For purposes of demonstrating key ideas, I’m going to just treat bits as plaintext strings just to keep things simple. While this means that the output isn’t truly compressed, you will hopefully take away a deeper understanding of how the encoding works by seeing actual code.

Theoretical Overview

Huffman encoding exploits the unequal distribution of character occurrences in text. Rather than encoding every single character with the same number of bites, it encodes characters that occur more frequently with smaller number of bits and those that occur less frequently with greater number of bits.

For example, lets say we have the text abc. Without compression, each character will take up a fixed number of bits - lets say a byte per character using an ASCII character set. That’s 3 bytes for 3 characters or 24 bits total.

If we used a variable length encoding, we can instead use as many bits as we need to identify a character! So if we mapped the characters like so:

a = 0
b = 10
c = 110

Then abc will be 010110 which is only 6 bits. But here’s the catch: we need to make sure that these codes are prefix free, meaning that no code in our set is a prefix of another code. Why? This is best understood with an example. Lets add another character, d, to our previous set.

a = 0
b = 01
c = 110
d = 10

Now consider if we wanted to encode acb, we would have 011001. But upon decoding it, it can be misinterpreted as bdb! Unless we introduce a delimiter into our output bit stream, we can’t tell where the bits of one character ends and another starts. The only way to tell without a delimiter is to have codes that introduce no ambiguity, and you can accomplish that by ensuring that the codes are prefix free.

This presents two additional challenges that the creator of the huffman encoding solved:

  1. How do we generate these prefix free codes?
  2. How do we generate optimal prefix free codes such that we are able to assign shorter codes to higher frequency characters?

The prefix free codes are created by constructing a binary trie. The edges in the tree represent 0’s and 1’s and the leaf nodes of this binary tree represent a unique character in a text. Therefore, the paths represent the code for the character at the leaf. Since the characters are at the leaf nodes, all the paths to those nodes are unique and non-overlapping, making the codes prefix free. To attain optimatily, the trie is constructed bottom up, starting with characters that occur the least often so that the eventual codes (made up of paths from the root of the trie to leaf nodes) are shortest for those that occur the most often.

Implementation Overview

Here’s an overview of both compression and decompression steps:

Compression

  1. read the text and figure out character frequency
  2. use frequency to build trie - this generates the bit sequence in the form of trie paths for each character
  3. generate a code table using trie - this lets us find a corresponding bit sequence code for a character
  4. encode our text using table to produce a bit stream
  5. encode our trie as bits. this will be used by decoder
  6. write both to a file

Decompression

  1. read the trie bits portion (header) to re-construct the trie. we’ll need this to decode the encoded text
  2. read the body / text bits portion (this is the encoded form of the actual value we’re trying to get)

The Trie Node

We’ll be using this to construct our trie. this will be referenced throughout the implementation.

class Node:
    def __init__(self, char="", left=None, right=None, freq=0):
        self.char = char
        self.left = left
        self.right = right
        self.freq = freq

    def to_binary(self):
        return "{:08b}".format(ord(self.char))

    def is_leaf(self):
        return (self.left is None and self.right is None)

    # necessary for heapq comparisons
    # heapq falls back on object comparison when priority keys are equal
    def __lt__(a, b):
        return a.char < b.char

    def __eq__(self, other):
        if isinstance(other, Node):
            return (
                (self.char == other.char) and
                (self.left == other.left) and
                (self.right == other.right)
            )
        return False

    def __repr__(self):
        return "None" if self.char is None else self.char

Compression Process

This is the main method for compression - we’re encoding the text and then we’re including some header metadata for the decoder. The essense of the header metadata is the serialized trie that we constructed for the purposes of encoding.

def compress(text):
    trie_tree = build_trie(text)
    table = build_code_table(trie_tree)
    trie_bits = serialize_trie_to_binary(trie_tree)
    header = "{:016b}{}".format(len(trie_bits), trie_bits)
    body = encode_text(table, text)
    return header + body

The following method uses a min heap to ensure that the most frequently occuring characters (via the freq attribute) are included in our trie structure last.

def build_trie(text):
    from collections import Counter
    from heapq import heappush, heappop
    char_count = Counter(text)
    queue = []
    for char, freq in char_count.items():
        node = Node(char=char, freq=freq)
        heappush(queue, (node.freq, node))
    while len(queue) > 1:
        freq1, node1 = heappop(queue)
        freq2, node2 = heappop(queue)
        parent_node = Node(
            left=node1,
            right=node2,
            freq=freq1 + freq2
        )
        heappush(queue, (parent_node.freq, parent_node))
    freq, root_node = heappop(queue)
    return root_node

This method constructs our character to code hash table. Our trie lets using decode an encoded stream by allowing us to follow the binary node paths to the characters using bit values in a stream. However, we need to create a character to code mapping in order for our constructed trie to be useful in the encoding process. Otherwise, we would need to scan our entire trie using either DFS or BFS searching for a target character (for every character we want to encode).

def build_code_table(node):
    table = {}
    def map_char_to_code(node, value):
        if node.is_leaf():
            table[node.char] = value
            return
        map_char_to_code(node.left, value + "0")
        map_char_to_code(node.right, value + "1")
    map_char_to_code(node, "")
    return table

In order for a decoder to decode our encoded text, it needs to know the character-to-code mapping we used so this method serializes the trie used in the encoding into bits. It uses a pre-order traversal to encode our trie. If it’s a non-leaf node, we prefix the output with a zero. Otherwise, we prefix it with a 1 followed by the actual bits representing the character.

def serialize_trie_to_binary(node): 
    if not node:
        return ""
    if node.is_leaf():
        return "1" + node.to_binary()
    return "0" + serialize_trie_to_binary(node.left) + serialize_trie_to_binary(node.right)

This method makes use of our character-to-code table to convert characters into bits. This represents our compressed text!

def encode_text(table, text):
    output = ""
    for x in text:
        output += table[x]
    return output

Decompression Process

Here’s the main method for decompression. It essentially re-constructs the trie in memory used the bits in the input representing the trie. Then it uses that in-memory trie to decode the bits of the input that represent our encoded text.

def decompress(bit_string):
    trie_size = int(f"{bit_string[0:16]}", 2)
    trie_range_end = 16 + trie_size
    trie = deserialize_binary_trie(bit_string[16:trie_range_end])
    body = bit_string[trie_range_end:]
    return decode_text(trie, body)

This function does the reverse of serialize_trie_to_binary. The recursion here takes advantage of the fact that 1 bits are leafs of our trie, therefore it can be used as a base case to continue de-serializing the next trie path. The curr_pos is used in this function to act as a pointer into our current read position so we know when to start and stop reading input.

def deserialize_binary_trie(bits):
    def read_bits(curr_pos):
        if curr_pos >= len(bits):
            return None, curr_pos
        bit = bits[curr_pos]
        if bit == "1":
            char_range_left = curr_pos+1
            char_range_right = char_range_left + 8
            char_bits = bits[char_range_left:char_range_right]
            return Node(
                char=chr(int(char_bits, 2))
            ), char_range_right
        left_node, pos = read_bits(curr_pos + 1)
        right_node, pos = read_bits(pos)
        return Node(
                left = left_node,
                right = right_node
        ), pos
    node, pos = read_bits(0)
    return node

Finally, with our trie object on hand, this function follows the bits of the encoded text using the trie to find the characters.

def decode_text(node, data):
    out = ""
    root = node
    curr_node = root
    for bit in data:
        if bit == "0":
            curr_node = curr_node.left
        else:
            curr_node = curr_node.right
        if curr_node.is_leaf():
            out += curr_node.char
            curr_node = root
    return out

That completes the overview of this basic python representation of the huffman algorithm. In practice, some implementations may used pre-existing code tables rather than generating them on the fly as we did here. For example, if you need fast encoding and know about average frequencies of the text you’re encoding, you may not want to be constructing a new trie on every encode operation.

References

Here’s a couple of resources I used in writing this implementation - I highly encourage you to check them out to understand huffman in even greater depth.