Fairseq: Improve memory efficiency of preprocess.py

Created on 18 Oct 2019  路  3Comments  路  Source: pytorch/fairseq

I'm trying to preprocess a big parallelized corpus (200M sentences or 21 GB per file). For some reason, it is taking up all the available ram (64 GB).

I tried preprocessing using mmap and lazy dataset implementation. There's probably something I'm not understanding in the preprocessing for it to take so much memory.

I'm also using 28 workers and joined dictionary. The other arguments have their default value.

enhancement help wanted

Most helpful comment

As a test, I did the following modification in preprocess.py:

@@ -1,9 +1,10 @@
-offsets = Binarizer.find_offsets(input_file, num_workers)
+num_tasks = num_workers * 7
+offsets = Binarizer.find_offsets(input_file, num_tasks)
 pool = None
-if num_workers > 1:
+if num_tasks > 1:
-    pool = Pool(processes=num_workers - 1)
-    for worker_id in range(1, num_workers):
-        prefix = "{}{}".format(output_prefix, worker_id)
+    pool = Pool(processes=num_workers - 1, maxtasksperchild=1)
+    for task_id in range(1, num_tasks):
+        prefix = "{}{}".format(output_prefix, task_id)
         pool.apply_async(
             binarize,
             (
@@ -12,8 +13,8 @@
                 vocab,
                 prefix,
                 lang,
-                offsets[worker_id],
-                offsets[worker_id + 1]
+                offsets[task_id],
+                offsets[task_id + 1]
             ),
             callback=merge_result
         )
@@ -21,16 +22,22 @@

 ds = indexed_dataset.make_builder(dataset_dest_file(args, output_prefix, lang, "bin"),
                                     impl=args.dataset_impl, vocab_size=len(vocab))
 merge_result(
     Binarizer.binarize(
         input_file, vocab, lambda t: ds.add_item(t),
         offset=0, end=offsets[1]
     )
 )
-if num_workers > 1:
+if num_tasks > 1:
     pool.join()
-    for worker_id in range(1, num_workers):
-        prefix = "{}{}".format(output_prefix, worker_id)
+    for task_id in range(1, num_tasks):
+        prefix = "{}{}".format(output_prefix, task_id)
         temp_file_path = dataset_dest_prefix(args, prefix, lang)
         ds.merge_file_(temp_file_path)
         os.remove(indexed_dataset.data_file_path(temp_file_path))

This seems to fix the ram issue. It still ramps up, but then goes down when the workers have reached their maximum number of task and get destroyed.

Implementing an elegant fix for this might not be so trivial however. Maybe keeping the maxtasksperchild at 1 and using an arbitrary maximum chunk size per worker could work. This number can then be used to compute the number of tasks (or chunk) necessary based on the number of workers.

Measuring, or simply estimating, the memory that processing one line takes up is also possible. Then, you can compute the biggest possible chunk w.r.t. the number of workers and the available ram. An optional memory usage limit argument could be added if the user does not want to use all the available ram.

All 3 comments

I've looked a bit at the binarize code. From what I understand, there is at most n_workers-1 line in memory at a time. However, for some reason, it actually accumulates in the ram instead of freeing it as it goes.

I have a feeling that it is related to python not freeing the memory and not the actual code. If that is the case, the problem could probably be fixed by dividing the corpus in more chunk than the number of workers-1. This makes it possible to use the maxtasksperchild argument from Pool to force python to give back ram.

As a test, I did the following modification in preprocess.py:

@@ -1,9 +1,10 @@
-offsets = Binarizer.find_offsets(input_file, num_workers)
+num_tasks = num_workers * 7
+offsets = Binarizer.find_offsets(input_file, num_tasks)
 pool = None
-if num_workers > 1:
+if num_tasks > 1:
-    pool = Pool(processes=num_workers - 1)
-    for worker_id in range(1, num_workers):
-        prefix = "{}{}".format(output_prefix, worker_id)
+    pool = Pool(processes=num_workers - 1, maxtasksperchild=1)
+    for task_id in range(1, num_tasks):
+        prefix = "{}{}".format(output_prefix, task_id)
         pool.apply_async(
             binarize,
             (
@@ -12,8 +13,8 @@
                 vocab,
                 prefix,
                 lang,
-                offsets[worker_id],
-                offsets[worker_id + 1]
+                offsets[task_id],
+                offsets[task_id + 1]
             ),
             callback=merge_result
         )
@@ -21,16 +22,22 @@

 ds = indexed_dataset.make_builder(dataset_dest_file(args, output_prefix, lang, "bin"),
                                     impl=args.dataset_impl, vocab_size=len(vocab))
 merge_result(
     Binarizer.binarize(
         input_file, vocab, lambda t: ds.add_item(t),
         offset=0, end=offsets[1]
     )
 )
-if num_workers > 1:
+if num_tasks > 1:
     pool.join()
-    for worker_id in range(1, num_workers):
-        prefix = "{}{}".format(output_prefix, worker_id)
+    for task_id in range(1, num_tasks):
+        prefix = "{}{}".format(output_prefix, task_id)
         temp_file_path = dataset_dest_prefix(args, prefix, lang)
         ds.merge_file_(temp_file_path)
         os.remove(indexed_dataset.data_file_path(temp_file_path))

This seems to fix the ram issue. It still ramps up, but then goes down when the workers have reached their maximum number of task and get destroyed.

Implementing an elegant fix for this might not be so trivial however. Maybe keeping the maxtasksperchild at 1 and using an arbitrary maximum chunk size per worker could work. This number can then be used to compute the number of tasks (or chunk) necessary based on the number of workers.

Measuring, or simply estimating, the memory that processing one line takes up is also possible. Then, you can compute the biggest possible chunk w.r.t. the number of workers and the available ram. An optional memory usage limit argument could be added if the user does not want to use all the available ram.

Hello PhilippeMarcotte! Thanks for your great work! I have met the same problem with you which is the preprocesing process will take up ram increasingly. Your scripts help me to process large datasets successfully.

Was this page helpful?
0 / 5 - 0 ratings