We've struggled for a while to make our multiprocessing code performant (and in it's current state, i'm not actually sure it works). Instead of doing this ourselves (or even trying to fix what we currently have), we should use the optimized torch.util.data.DatasetLoader when pytorch release v1.1.X.
I dug into this a bit and I think I came up with a solution that will work with the _next_ pytorch release. The reason for this is the introduction of a torch.utils.data.IterableDataset, which allows datasets to be viewed as streams (compared to being indexable, which currently they are required to be).
https://pytorch.org/docs/master/data.html#torch.utils.data.IterableDataset
The pytorch Dataset class assumes all datasets have a __len__ method, which may not be true of allennlp datasets, if they are lazy, or infinite streams.
The way pytorch exposes creating iterables is via __getitem__ and correspondingly which indices a batch is composed of. This is not suitable for allennlp, where we must be able to control the full generator to do things like bucketing by length.
I think that this can be implemented in a backward compatible way via:
A torch.utils.data.IterableDataset subclass which performs the functions of our iterators, e.g loading max_instances_in_memory at a time, sorting by length etc but still yielding single instances.
Modifying MultiProcessDataIterator to create a DataLoader inside its __call__ method (as it has the same Iterable[TensorDict] api as the DataLoader), where internally, the Iterable[Instances] it is passed is expected to be the IterableDataset subclass from above.
Basically the current Iterator functionality would be moved into subclassed IterableDatasets and we would iterate over these datasets using only the DataLoader, where the only job of the DataLoader would be to 1. do multiprocessing and 2. convert instances to tensor dicts.
Rather than having wrapper classes which control sharding in dataset readers like the MultiprocessDatasetIterator/Reader, dataset_readers should control their own sharding if they expect to be used for multiprocess reading using torch.utils.data.get_worker_info(). This returns None if not in a data reading process, but returns worker information if it is. This could be used to e.g read different data shards from a directory.
Variable sized batches - some of the allennlp data functionality allows you to return variable sized batches, such as batches with a fixed number of tokens in. I'm not sure how to do this currently with the torch dataloader. I think it involves implementing a BatchSampler which returns different lengths of indices, but it's a little complicated because we want to view our datasets as streams, and not indexable objects, so i'm not quite sure how that would work.
Caching instances - I'm not sure how this works with multiprocessing - but part of the reason we need it in the first place is because the multiprocessing doesn't work properly anyway.
Below is an example of a very simple wrapper of an allennlp dataset reader to allow use with the DataLoader from pytorch. Currently it is difficult to bucket this reader, because of the way we are forced to access only one element at a time.
from torch.utils.data import Dataset, DataLoader
from allennlp.data.dataset_readers import LanguageModelingReader, DatasetReader
from allennlp.data.fields.text_field import TextField
from allennlp.data.dataset import Batch
from allennlp.data.instance import Instance
from allennlp.data.vocabulary import Vocabulary
class AllennlpDataset(Dataset):
def __init__(self,
reader: DatasetReader,
dataset_path: str):
self.reader = reader
self.dataset_path = dataset_path
self.iterable = self.reader.read(dataset_path)
self.iterator = (x for x in self.iterable)
self._length = None
def __len__(self):
"""
This is gross but will go away in the next pytorch release,
as they are introducing an `IterableDataset` class which doesn't
need to have a length:
https://pytorch.org/docs/master/data.html#torch.utils.data.IterableDataset
"""
if self._length is None:
self._length = 0
for i in self.iterator:
self._length += 1
self.iterator = (x for x in self.iterable)
return self._length
def __getitem__(self, idx) -> Instance:
get_next = next(self.iterator, None)
if get_next is None:
self.iterator = (x for x in self.iterable)
get_next = next(self.iterator)
return get_next
vocab = Vocabulary.from_instances((dataset[i] for i in range(len(dataset))))
# Function to tell the torch DataLoader how to batch up our custom data, i.e Instances
def allennlp_collocate(batch):
batch = Batch(batch)
batch.index_instances(vocab)
return batch.as_tensor_dict(batch.get_padding_lengths())
dataset = AllennlpDataset(LanguageModelingReader(lazy=True), "./baby_data.txt")
loader = DataLoader(dataset, batch_size=2, num_workers =2, collate_fn=allennlp_collocate)
for batch in loader:
print(batch)
Let me know what you think or if you can foresee any other big problems!
@matt-gardner, @joelgrus and @brendan-ai2, would be good to get your opinion on this.
I haven't thought through the details yet, but I like the basic idea a lot.
Seems reasonable to me. Integrating with native pytorch data stuff has been on our wish list for a long time, it just never seemed feasible. If you can make it work, great.
@DeNeutoy Any progress in this front? I progress along this line to integrate pytorch dataloaders this would be ideal combined with a move to DistributedDataParallel.
@sai-prasanna this is something we've got bookmarked as a possible change for allennlp v1.0, as it might be a bit difficult to make it backward compatible. Actually i'm not so sure that this would impact moving to DistributedDataParallel, as from what I understand (I may be wrong about this), in the DistributedDataParallel setup, each GPU has it's own process, so the pressure for generating the data is removed. Maybe you have more information though?
@DeNeutoy I've posted about my attempt to bring in DistributedDataParallel in #2536 comment. Can you see if that helps?
@brendan-ai2
A move to DDP would require changes in dataset loading for splitting the dataset across workers. Pytorch provides two ways for doing this depending on the style of dataset being used.
For map-style datasets, the main process generates the indices using sampler and sends them to the workers. So any shuffle randomization is done in the main process which guides loading by assigning indices to load.
For iterable-style datasets, since each worker process gets a replica of the dataset object, naive multi-process loading will often result in duplicated data. Using torch.utils.data.get_worker_info() and/or worker_init_fn, users may configure each replica independently. (See IterableDataset documentations for how to achieve this. ) For similar reasons, in multi-process loading, the drop_last argument drops the last non-full batch of each worker鈥檚 iterable-style dataset replica.
@scarecrow1123 / @sai-prasanna Thanks. I don't think it's actually required - it's just a nice to have. I think we should _not_ modify existing dataset readers to allow them to do this, as people who are working with DDP are likely to be advanced users, and it is relatively easy to make a dataset reader conform to either of these specs. After we have DDP and pytorch dataloaders actually working, if loads of people are using it, we can think about making existing dataset readers easy to use out of the box with DDP. Does that sound good?
@DeNeutoy I am coming from the perspective that making DDP as the default way of multi GPU would be better (if it proves faster for most tasks). And also that since 1.0 will have breaking changes, we can solve this problem in that milestone.
I am currently trying out @scarecrow1123 's DDP code and inside the dataset reader, using pytorch's Dataset and Dataloader to speed things up.
Here are a few points which I think might make the design user friendly.
Though Dataset needs implementing the __len__, it is generally less cognitive overload for end user. And I believe most datasets can implement it, the case for using true streams (as in from the database)is not generally norm
Instead of wrapping datasetreader in a pytorch dataset, I think it would be preferable to make all DatasetReaders into pytorch Datasets. This would require removing the _read function and passing the paths in the constructor for train, test and validation.
If we wrap dataset readers it would be possible to only use IterableDatasets, but with replacing datasetreaders with Dataset approach, we can support both Dataset and IterableDataset.
Even when only using DataLoader inside a DatasetReader, I faced an issue where the pickling and depickling during multi processing destroys object references, so stateful update of token_indexers
such as
don't happen as expected. i.e self._added_to_vocabulary = True which happens during indexing sets the value of de-pickled token indexer from multi processing. We actually wanted this to set the value on the token_indexer referenced by a dataset reader (datasetreader._source_token_indexers), but it doesn't.
@sai-prasanna thanks - all good points! I wasn't sure initially that we were going to pick this up, and the wrapper was the easiest way to make a proof of concept that something like this was possible. We'll definitely be inheriting from one of the dataset classes.
That pickling thing is very annoying. There might be a way to get round that, which is to ensure that the dataset reader is stateless _when it is building the vocab_, or just require (initially) that you pre-construct a vocab. I'll be sure to check this when implementing this though. Thanks for your input!
Do you have a link to your modifications of @scarecrow1123's DDP code which internally uses the torch datasets?
Update on this, to figure out precisely what speedup we expect:
The current implementation of multiprocessing does provide small speedups when the data reading function (in allennlp's case, the read function for a particular dataset reader) is _very_ slow. However, for e.g SNLI, which just reads json blobs, there is no speedup over using the master process.
Full Multiprocessing Worker Comparison (Mean 5 runs)
DatasetReader: SNLI
Measurement: Time taken to read from disk, index, and tensorise 30 batches of batch size 32 instances.
Run Mean Std
baseline: 0.5600 0.0486
pytorch, baseline: 0.5434 0.0387
two workers: 0.7009 0.0475
five workers: 0.7834 0.0793
pytorch, two workers: 0.3529 0.0509
pytorch, five workers: 0.2032 0.0101
Full Multiprocessing Worker Comparison (Mean 5 runs)
DatasetReader: SNLI with forced 0.01 sec sleep in text_to_instance to simulate a slow data generating process
Measurement: Time taken to read from disk, index, and tensorise 30 batches of batch size 32 instances.
Run Mean Std
baseline: 12.5646 0.0293
pytorch, baseline: 12.4746 0.0368
allennlp, two workers: 11.0769 0.0789
allennlp, five workers: 10.5293 0.0338
pytorch, two workers: 6.3277 0.0738
pytorch, five workers: 2.7609 0.0074
So in summary:
@sai-prasanna thanks - all good points! I wasn't sure initially that we were going to pick this up, and the wrapper was the easiest way to make a proof of concept that something like this was possible. We'll definitely be inheriting from one of the dataset classes.
That pickling thing is very annoying. There might be a way to get round that, which is to ensure that the dataset reader is stateless _when it is building the vocab_, or just require (initially) that you pre-construct a vocab. I'll be sure to check this when implementing this though. Thanks for your input!
Do you have a link to your modifications of @scarecrow1123's DDP code which internally uses the torch datasets?
Here is the DatasetReader for seq2seq which uses torch Datasets.
https://gist.github.com/sai-prasanna/4562d73146af8b7a55b4b9d96da5a9a3
Closing as fixed by #3700.