Home > Software design >  How to merge hundreds of CSVs, each with millions of rows, using minimal RAM?
How to merge hundreds of CSVs, each with millions of rows, using minimal RAM?

Time:01-17

I have terabytes of sensor data stored across separate CSV files in format timestamp and value. I need to merge these CSV files where I will have one column for timestamp (calculated as the average of all other timestamps for each row) and each sensor will have its own column where the column name comes from the file name. The number of sensors is more than 500. What tech stack to look into? I can't fit all data in RAM at once.

Example:

sensor1.csv
  timestamp  value
  10000        1.9
  10010        2.2
  ... (millions of rows)

sensor2.csv
  timestamp  value
  10004        3.5
  10012        4.3

... (500 more files)

Result should look like this (timestamp in this file is the average of all the timestamps from all 500 input files, names of sensors, e.g. sensor1, sensor2 and etc., come from filenames):

merged_file.csv
timestamp  sensor1  sensor2   ...  sensor500
  10002      1.9      3.5            2.1
  10011      2.2      4.3            3.5

After merge, I would like to store data in a database, e.g. InfluxDB, for future analysis and model trainings. What tools would be best to perform merge and analysis operations on this data?

CodePudding user response:

I see your post as two distinct questions: 1) How to merge 500 CSVs? 2) whatever comes next, some DB?

This is a solution for the first question. I'm using Python, there are languages/runtimes that could do this faster, but I think Python will give you a good first start at the problem and I'm expecting Python will be more accessbile and easier to use for you.

Also, my solution is predicated on the fact that all 500 CSVs have identical row counts.

My solution opens all 500 CSVs at once, creates an outer loop over a set number of rows, and an inner loop over each CSV:

  • The inner loop reads the timestamp and value for a row in each CSV, averaging the 500 timestamps into a single column, and accumulating the 500 distinct values in their own columns, and all that goes into a final merged row with 501 columns.

  • The outer loop repeats that process for as many rows as there are across all 500 CSVs.

I generated some sample data, 500 CSVs each with 1_000_000 rows, for 6.5G of CSVs. I ran the following script on my M1 Macbook Air. It completed in 8.3 minutes and peaked at 34.6M of RAM and produced a final CSV that is about 2G on disk.

import csv
import glob

# Fill this in based on your own knowledge, or, based on the output of 'analyze_stats.py'
NUM_ROWS = 1_000_000

sensor_filenames = sorted(glob.glob('sensor*.csv'))

# Sort: trim leading 6 chars, 'sensor', and trailing 4 chars, '.csv', leaving just the number in the middle
sensor_filenames = sorted(sensor_filenames, key=lambda x: int(x[6:-4]))

# Get handles to all files, and create input CSV readers
sensor_readers = []
for sensor_fname in sensor_filenames:
    f = open(sensor_fname, newline='')
    sensor_readers.append(csv.reader(f))

# Create output CSV writer
f = open('merged_sensors.csv', 'w', newline='')
writer = csv.writer(f)

# Discard all sensor headers
for reader in sensor_readers:
    next(reader)

# Build up output header and write
output_header = ['timestamp']
for sensor_fname in sensor_filenames:
    sensor_name = sensor_fname[:-4]  # trim off '.csv'
    output_header.append(sensor_name)
writer.writerow(output_header)

row_count = 0
while row_count < NUM_ROWS:
    row_count  = 1
    values = []
    timestamps = []
    for reader in sensor_readers:
        row = next(reader)

        ts, val = row
        timestamps.append(int(ts))
        values.append(val)
    
    if row_count % 1000 == 0:
        print(f'Merged {row_count} rows')
 
    avg_ts = int(sum(timestamps)/len(timestamps))
    writer.writerow([avg_ts]   values)

I haven't profiled this, but I believe the only real allocations of memory that add up are going to be:

  • the 500 file handles and CSV readers (which is small) for the entirety of the process
  • each row from the input CSVs in the inner loop
  • the final merged row in the outer loop

At the top of the script I mention analyze_stats.py. Even if this were my data, I'd be very patient and break down the entire process into multiple steps, each which I could verify, and I would ultimately arrive at the correct, final CSV. This is especially true for me trying to help you, because I don't control the data or "know" it like you, so I'm going to offer up this bigger process:

  1. Read all the CSVs and record some stats: headers, column counts, and especially row counts.

  2. Analyze those stats for "conformance", making sure no CSVs deviates from your idea of what it should be, and especially get confirmation that all 500 CSVs have the same number of columns and rows.

  3. Use the proven row count as input into the merge process.

    There are ways to write the merge script so it doesn't have to know "the row count" up front, but it's more code, it's slightly more confusing, and it won't help you if there ever is a problem... you probably don't want to find out on row 2 million that there was problem "somewhere", I know I hate it when that happens.

If you're new to Python or the CSV readers/writers I recommend you read these scripts first.

  • get_sensor_stats.py: reads all your sensor data and records the header, the minimum and maximum number of columns seens, and the row count for each CSV; it writes all those stats out to a CSV

  • analyze_stats.py: reads in the stats CSV and checks to make sure header and column counts meet pre-defined values; it also keeps a tally of all the row counts for each file, and if there are files with different row counts it will let you know

Also, here's the script I used to generate my 6G of sample data:

  • gen_sensor_data.py: is my attempt to meaningfully represent your problem space, both in size and complexity (which is very easy, thankfully
  •  Tags:  
  • Related