Archives for : May2023

Fast async Python: Using aiofiles and aiocsv to parse large CSV files

# This file is a demo of using aiocsv and aiofiles libraries to speed up reading and parsing CSV files.
#
# Start reading this code from the entrypoint function main() below.
#
import asyncio
import aiofiles
from csv import QUOTE_NONNUMERIC
from typing import AsyncGenerator
from aiocsv import AsyncDictWriter, AsyncDictReader


async def read_lines(file: str) -> AsyncGenerator[dict, None]:
    """
    Read lines from CSV file.
    """
    async with aiofiles.open(file, "r") as afp:
        async for row in AsyncDictReader(afp, delimiter=","):
            yield row


async def parse_lines(generator: AsyncGenerator[dict, None]) -> AsyncGenerator[dict, None]:
    """
    Parse lines from generator.
    """
    async for line in generator:
        # do some parsing here, like that:
        line = line
        yield line


async def save_lines(file: str, generator: AsyncGenerator[dict, None]):
    """
    Save lines from generator to CSV file.
    """
    async with aiofiles.open(
            file,
            mode="w",
            encoding="utf-8",
            newline="",
    ) as afp:
        rows = []
        writer = None
        async for item in generator:
            if writer is None:
                header = list(item.keys())
                writer = AsyncDictWriter(
                    afp,
                    header,
                    quoting=QUOTE_NONNUMERIC,
                )
                await writer.writeheader()
            # gather rows into a list
            # keep the list size reasonable according to your memory constraints
            rows.append(item)
            if len(rows) % 10000 == 0:
                await writer.writerows(rows)
                rows = []
            await afp.flush()
        # write the rest of the rows if any
        if len(rows) > 0:
            await writer.writerows(rows)


async def main(in_file, out_file):
    """
    Main function that reads lines from in_file, parses them and saves to out_file.
    """
    raw_line_generator = read_lines(in_file)
    parsed_line_generator = parse_lines(generator=raw_line_generator)
    await save_lines(file=out_file, generator=parsed_line_generator)


in_file = "some_input_file.csv"
out_file = "some_output_file.csv"
asyncio.run(main(in_file, out_file))