# This file is a demo of using aiocsv and aiofiles libraries to speed up reading and parsing CSV files. # # Start reading this code from the entrypoint function main() below. # import asyncio import aiofiles from csv import QUOTE_NONNUMERIC from typing import AsyncGenerator from aiocsv import AsyncDictWriter, AsyncDictReader async def read_lines(file: str) -> AsyncGenerator[dict, None]: """ Read lines from CSV file. """ async with aiofiles.open(file, "r") as afp: async for row in AsyncDictReader(afp, delimiter=","): yield row async def parse_lines(generator: AsyncGenerator[dict, None]) -> AsyncGenerator[dict, None]: """ Parse lines from generator. """ async for line in generator: # do some parsing here, like that: line = line yield line async def save_lines(file: str, generator: AsyncGenerator[dict, None]): """ Save lines from generator to CSV file. """ async with aiofiles.open( file, mode="w", encoding="utf-8", newline="", ) as afp: rows = [] writer = None async for item in generator: if writer is None: header = list(item.keys()) writer = AsyncDictWriter( afp, header, quoting=QUOTE_NONNUMERIC, ) await writer.writeheader() # gather rows into a list # keep the list size reasonable according to your memory constraints rows.append(item) if len(rows) % 10000 == 0: await writer.writerows(rows) rows = [] await afp.flush() # write the rest of the rows if any if len(rows) > 0: await writer.writerows(rows) async def main(in_file, out_file): """ Main function that reads lines from in_file, parses them and saves to out_file. """ raw_line_generator = read_lines(in_file) parsed_line_generator = parse_lines(generator=raw_line_generator) await save_lines(file=out_file, generator=parsed_line_generator) in_file = "some_input_file.csv" out_file = "some_output_file.csv" asyncio.run(main(in_file, out_file))