Optimizing Searching a File for a Binary Sequence in Python
I recently spent some time optimizing a deserialization tool at work. I wanted to walk through my process for optimizing this Python code.
It’s a truism that if you want speed, Python is probably not the right language to be using. However, that doesn’t mean there aren’t situations where Python optimization is useful.
My employer, PointOne Navigation, provides open source Python library for processing data from our GPS receivers https://github.com/PointOneNav/fusion-engine-client. It’s a proprietary protocol that’s meant to support lossy transports like an RS232 connection. While we also provide C++ tools, the graphical analysis is all Python based. We’re a small team and don’t have the resources to manage generating compiled wheels, so our Python package is pure Python code.
“Framing” the Problem
The protocol is pretty straightforward:
Field | Data Type | Description |
---|---|---|
Sync Byte 0 | u8 | Always 0x2E (ASCII “.”) |
Sync Byte 1 | u8 | Always 0x31 (ASCII “1”) |
Reserved | u8[2] | Reserved for future use. |
CRC | u32 | The 32-bit CRC of all bytes from and including the protocol version field to the last byte in the message, including the message payload. This uses the standard CRC-32 generator polynomial in reversed order (0xEDB88320). |
Protocol Version | u8 | The version of the FusionEngine Protocol. |
Message Version | u8 | The version of this message. |
Message Type | u16 | Uniquely identifies each message type. The combination of Version and Type should be used to know how to decode the payload. |
Sequence Number | u32 | A sequence number that is incremented with each message. |
Payload Size | u32 | Size of the payload to follow in bytes. |
Source Identifier | u32 | Identifies the source of the message when applicable. This definition can change depending on the message type. |
Payload | u8[N] | “Payload Size” bytes making up the contents of the “Message Type” message. |
The typical use case for our tools is to analyze data captured from a serial port and logged directly to a file. This data is typically a mix of our FusionEngine (FE) protocol, along with types of data.
Our tools work by building and caching an index of the FE messages in the log to speed up subsequent runs.
While it can vary a lot, we can generate up to about 250MB an hour. This adds challenges when dealing with 24hour data collections since Python would struggle to manipulate 8GB data objects in memory (Python will often generate multiple copies of the same data).
Since the indexing time was becoming a pain point, and was very well defined, I decided it would be a good target for optimization.
The original approach
Here’s a simplified form of the original code:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
while True:
start_offset_bytes = self.input_file.tell()
byte0 = self.input_file.read(1)[0]
while True:
if byte0 == MessageHeader.SYNC0:
byte1 = self.input_file.read(1)[0]
if byte1 == MessageHeader.SYNC1:
header = MessageHeader()
data self.input_file.read(header.calcsize())
header.unpack(data)
data += self.input_file.read(header.payload_size_bytes)
if header.validate_crc(data):
# Message Found!
else:
read_size = header.payload_size_bytes + header.calcsize()
self.input_file.seek(start_offset_bytes, os.SEEK_SET)
byte0 = byte1
else:
break
Effectively, this would read through the file byte by byte trying to find a match to the preamble. Then check if the corresponding data passed a CRC check.
There’s many ways to speed this up (besides just switching to another language):
- System calls can be expensive. Generally, it’s faster if you can read a file in large chunks.
- Pure Python is slow. Using a library with native code optimizations can improve speed.
- While Python only recently is starting to support true parallel multi-threading, parallelizing the processing can provide speed ups.
- Avoiding the more expensive CRC could speed things up depending on how often the preamble appears in the data contents. Adding more checks like requiring 0 in the reserved data, or only processing messages with known message ID’s could reduce the number of CRC checks needed.
- Instead of checking every offset, I could use something like the Boyer-Moore strstr algorithm.
- While I didn’t explore it here, I could potentially use a GPU for speed up.
Optimization process
The basic problem of finding all instances of a byte sequence in a file seemed like something that should be a solved problem.
My first thought was to leverage existing linux tools. grep
and ripgrep
can be surprisingly fast even for binary data, and can be used process a file in parallel. However, since this library needs to support Windows and only have Python as a dependency, these were off the table.
Finding a somewhat applicable Stack Overflow answer was also surprisingly unhelpful: https://stackoverflow.com/questions/37078978/the-fastest-way-to-find-a-string-inside-a-file. While not bad advice for a small file, it didn’t really get into the space I was working in.
So I decided to get to the fundamentals and try to do some profiling to figure out the bottleneck.
A quick note for profiling, I’m using a relatively powerful laptop with an SSD, 40GB of memory, and 16 cores. The details of the workload, and the specs of the machine could definitely have changed the results of what turned out to be the most fruitful optimizations.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from datetime import datetime
import struct
import os
file_path = '/logs/input.raw'
file_size = os.stat(input_path).st_size
READ_SIZE = 80 * 1024
READ_WORDS = int(READ_SIZE/2)
PREAMBLE = 0x312E
########################### Un-optimized Speed (6.4 MB/s)
########################### READ ONLY (2259.7 MB/s)
with open(file_path, 'rb') as fd:
start_time = datetime.now()
while True:
data = fd.read(READ_SIZE)
if len(data) == 0:
break
elapsed_sec = (datetime.now() - start_time).total_seconds()
print(f'Read only rate: {file_size / elapsed_sec / 1e6} MB/s')
########################### V1 (12.5 MB/s)
offsets = []
total_bytes_read = 0
with open(file_path, 'rb') as fd:
start_time = datetime.now()
while True:
data = fd.read(READ_SIZE)
if len(data) == 0:
break
for i in range(len(data)-1):
if data[i:i+1] == b'.1':
offsets.append(total_bytes_read + i)
total_bytes_read += len(data)
elapsed_sec = (datetime.now() - start_time).total_seconds()
print(f'Read only rate: {file_size / elapsed_sec / 1e6} MB/s')
########################### V2 (18.0 MB/s)
offsets = []
total_bytes_read = 0
with open(file_path, 'rb') as fd:
start_time = datetime.now()
while True:
data = fd.read(READ_SIZE)
if len(data) == 0:
break
# Check the even offsets
words0 = struct.unpack(f'{READ_WORDS}H', data)
for i in range(len(words0)):
if words0[i] == PREAMBLE:
offsets.append(total_bytes_read + i*2)
# Check the odd offsets
words1 = struct.unpack(f'{READ_WORDS-1}H', data[1:-1])
for i in range(len(words1)):
if words1[i] == PREAMBLE:
offsets.append(total_bytes_read + i*2 + 1)
total_bytes_read += len(data)
elapsed_sec = (datetime.now() - start_time).total_seconds()
print(f'Read only rate: {file_size / elapsed_sec / 1e6} MB/s')
I spent some time experimenting with the READ_SIZE
to find a size that worked best on my system.
This was a very informative initial test. First, it showed that when reading in chunks, the disk IO was not a concern at all. Second, it showed that the CRC checks were not dominating the processing time. Even without them I could only get a 2x-3x speed up.
If I was going to get more significant speed ups, I’d need to keep focussing on speeding up the preamble search step.
My next set of tests looked at pushing the processing into Numpy which uses native optimized code:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from datetime import datetime
import os
import numpy as np
file_path = '/logs/input.raw'
file_size = os.stat(input_path).st_size
READ_SIZE = 80 * 1024
READ_WORDS = int(READ_SIZE/2)
PREAMBLE = 0x312E
########################### V3 (1444.5 MB/s)
offsets = []
total_bytes_read = 0
with open(file_path, 'rb') as fd:
start_time = datetime.now()
while True:
data = np.fromfile(fd, dtype=np.uint16, count=READ_WORDS)
if len(data) == 0:
break
offsets += (np.where(data==PREAMBLE)[0] + total_bytes_read).tolist()
# AA 31, 2E AA
data = (data[:-1] << 8) | (data[1:] >> 8)
offsets += (np.where(data==PREAMBLE)[0] + 1 + total_bytes_read).tolist()
total_bytes_read += len(data)
elapsed_sec = (datetime.now() - start_time).total_seconds()
print(f'Read only rate: {file_size / elapsed_sec / 1e6} MB/s')
########################### V4 (2420.5 MB/s)
offsets = []
total_bytes_read = 0
with open(file_path, 'rb') as fd:
start_time = datetime.now()
while True:
data = fd.read(READ_SIZE)
if len(data) == 0:
break
sync_matches = (2 * np.where(np.frombuffer(data, dtype=np.uint16) == PREAMBLE)[0]).tolist()
# AA 31, 2E AA
sync_matches += (2 * np.where(np.frombuffer(data[1:-1], dtype=np.uint16) == PREAMBLE)[0] + 1).tolist()
total_bytes_read += len(data)
elapsed_sec = (datetime.now() - start_time).total_seconds()
print(f'Read only rate: {file_size / elapsed_sec / 1e6} MB/s')
These tests are a great advertisement for Numpy
. Even though this wasn’t a typical numeric analysis tasks, I could still get an incredible speed up from going to native (presumably vector instruction optimized) code.
Even going back and testing with the CRC check, the Numpy code was running at about 232 MB/s.
But why stop there? I should be able to get some significant speed ups with parallelism. When I was doing this project, Python still hadn’t introduced parallel multi-threading. This means that I needed to use the multiprocessing
library to get the full impact of parallel execution.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from datetime import datetime
from multiprocessing import Pool
import os
from typing import List
import numpy as np
from fusion_engine_client.messages.defs import MessageHeader
file_path = '/logs/input.raw'
MAX_MSG_SIZE = 1024 * 12
READ_SIZE = 80 * 1024
READ_WORDS = int(READ_SIZE / 2)
PREAMBLE = 0x312E
NUM_THREADS = 8
def process_func(block_starts: List[int]):
offsets = []
header = MessageHeader()
with open(file_path, 'rb') as fd:
for i in range(len(block_starts)):
block_offset = block_starts[i]
fd.seek(block_offset)
data = fd.read(READ_SIZE + MAX_MSG_SIZE)
if len(data) == READ_SIZE + MAX_MSG_SIZE:
word_count = READ_WORDS
else:
word_count = int(len(data)/2)
sync_matches = (2 * np.where(np.frombuffer(data, dtype=np.uint16, count=word_count) == PREAMBLE)[0]).tolist()
# AA 31, 2E AA
sync_matches += (2 * np.where(np.frombuffer(data[1:], dtype=np.uint16, count=word_count-1) == PREAMBLE)[0] + 1).tolist()
for i in sync_matches:
try:
header.unpack(buffer=data[i:], validate_crc=True, warn_on_unrecognized=False)
offsets.append(i + block_offset)
except:
pass
return offsets
def main():
file_size = os.stat(input_path).st_size
print(f'File size: {int(file_size/1024/1024)}MB')
block_starts = []
num_blocks = int(np.ceil(file_size / READ_SIZE))
chunks, chunk_remainder = divmod(num_blocks, NUM_THREADS)
byte_offset = 0
for i in range(NUM_THREADS):
blocks = chunks
if i < chunk_remainder:
blocks += 1
block_starts.append(list(range(byte_offset, byte_offset + blocks * READ_SIZE, READ_SIZE)))
byte_offset += blocks * READ_SIZE
print(f'Reads/thread: {len(block_starts[0])}')
offsets = []
with Pool(NUM_THREADS) as p:
for o in p.map(process_func, block_starts):
offsets += o
print(f'Preamble found: {len(offsets)}')
main()
This brought the speed with the CRC checks up to about 900MB/s. This would actually probably improve with longer logs. I’d been testing with a 1.3GB log and at this speed, a large portion of the time is probably spent on initializing the processes.
Of course these were all simplified test applications, and actually all have some subtle bugs (mostly concerning messages that span from one read to the next).
You can see the final code at https://github.com/PointOneNav/fusion-engine-client/blob/master/python/fusion_engine_client/parsers/fast_indexer.py