Ignite: Provide tiny wrapper over pytorch ThroughputBenchmark

Created on 19 Mar 2020  路  11Comments  路  Source: pytorch/ignite

馃殌 Feature

PyTorch utils module provides ThroughputBenchmark since 1.2.0

 >>> from torch.utils import ThroughputBenchmark
>>> bench = ThroughputBenchmark(my_module)
>>> # Pre-populate benchmark's data set with the inputs
>>> for input in inputs:
    # Both args and kwargs work, same as any PyTorch Module / ScriptModule
    bench.add_input(input[0], x2=input[1])
>>> # Inputs supplied above are randomly used during the execution
>>> stats = bench.benchmark(
                num_calling_threads=4,
                num_warmup_iters = 100,
                num_iters = 1000,
            )
>>> print("Avg latency (ms): {}".format(stats.latency_avg_ms))
>>> print("Number of iterations: {}".format(stats.num_iters))

It would be interesting to provide a tiny wrapper over this to simplify usage with ignite.

enhancement help wanted

Most helpful comment

I've used a custom event_filter (mostly because I just want to get more familiar with the code)

import torch
from torch.utils import ThroughputBenchmark
from ignite.engine import Events, Engine
from typing import Iterable, Union, Callable
import contextlib


class ThroughputBenchmarkWrapper:
    def __init__(
        self, 
        num_calling_threads: int = 1,
        num_warmup_iters: int = 10,
        num_iters: int = 100,
    ):
        self._bench = ThroughputBenchmark(model)
        self._num_calling_threads = num_calling_threads
        self._num_warmup_iters = num_warmup_iters
        self._num_iters = num_iters
        self._stats = None

    def _batch_logger(self, engine: Engine, input_transform: Callable):
        input_data = input_transform(engine.state.batch)
        self._bench.add_input(input_data)

    def _run(self, engine: Engine):
        self._stats = self._bench.benchmark(
            num_calling_threads=self._num_calling_threads, 
            num_warmup_iters=self._num_warmup_iters, 
            num_iters=self._num_iters,
        )

    def _detach(self, engine: Engine):
        if engine.has_event_handler(self._batch_logger, Events.ITERATION_STARTED):
            engine.remove_event_handler(self._batch_logger, Events.ITERATION_STARTED)
        if engine.has_event_handler(self._run, Events.COMPLETED):
            engine.remove_event_handler(self._run, Events.COMPLETED)

    @contextlib.contextmanager
    def attach(self, engine: Engine, max_batches: int = 10, input_transform: Callable = lambda input_batch: input_batch[0]):
        def under_max_batches(engine: Engine, event: Events):
            # Events start with 1
            if event <= max_batches:
                return True
            return False

        if not engine.has_event_handler(self._run):
            engine.add_event_handler(
                Events.ITERATION_STARTED(event_filter=under_max_batches), 
                self._batch_logger, 
                input_transform, 
            )
            engine.add_event_handler(
                Events.COMPLETED, self._run,
            )

        yield engine
        self._detach(engine)

    @property
    def stats(self):
        if self._stats is None:
            raise RuntimeError(
                "Benchmark wrapper hasn't run yet so results can't be retrieved."
            )
        return self._state

I hope it is fine, that I am not including the docstrings and the input tests here, to keep the output short and because we are still discussing the design. (These are my first contributions to OS projects, sorry for asking trivial questions sometimes)

All 11 comments

Hi,
I would like to contribute. :)
But the documentation of throughput_benchmark is rather short.
For example, I can't figure out, what the point of x2=input[1] is.
Looking at the documentation it doesn't seem like anything is done with this value.

Looking at the C/C++ binding (which I have very little experience with) https://github.com/pytorch/pytorch/blob/master/torch/csrc/utils/throughput_benchmark.cpp#L108

I don't quite follow if x2 is used or not. In would assume that the label of the data point is just discarded in the benchmark?

But aside from the internal working, how would you like to structure the wrapper?
What should the ignite wrapper provide for value? Should it automatically move the model to a given device similar to create_supervised_*? Should it optionally create a JIT trace?
Or should it just attach to an engine with mostly the same code?

Thanks,
Kai

I would like to contribute. :)

Hi @kai-tub, that's great! Thanks for that.

Well, I agree that first of all, we need to understand how ThroughputBenchmark is working.

I think input in the example are just samples (without targets), same as the inputs of forward of the module ...

But aside from the internal working, how would you like to structure the wrapper?
What should the ignite wrapper provide for value? Should it automatically move the model to a given device similar to create_supervised_*? Should it optionally create a JIT trace?
Or should it just attach to an engine with mostly the same code?

For instance, I have no precise idea about the wrapper. Original example code already looks very simple and concise. Basically, the idea is to make it 1-2 lines long for the usage or put somewhere as integration... I need to think about that...

I think input in the example are just samples (without targets), same as the inputs of forward of the module ...

But given that it seems to be a tuple, it looks more like a batch to me. 馃

Original example code already looks very simple and concise.

I agree

I need to think about that...

Sure! Feel free to throw around some ideas. :)

Ok, so I've done a couple of quick experiments:

import torch
from torch import nn
from torch.utils import ThroughputBenchmark

class Minimal(nn.Module):
    def __init__(self):
        super(Minimal, self).__init__()
        self.fc1 = nn.Linear(10, 20)
        self.fc2 = nn.Linear(20, 10)

    def forward(self, x):
        x = self.fc1(x)
        x = self.fc2(x)
        return x

dummy_data = torch.rand(32, 10)
model = Minimal()
model.eval()

bench = ThroughputBenchmark(model)

for forward_input in dummy_data:
    bench.add_input(forward_input)

bench.run_once(forward_input)  # test once

stats = bench.benchmark(
    num_calling_threads=4,
    num_warmup_iters=500,
    num_iters=1000,
)
print(stats)  # Has all of the important information

This is a minimal working example for the ThroughputBenchmark.

Everything that is added with: add_input will be used in the forward function of the model. In my upper example, only X is used in the forward function and if x2= is used, as in their example: bench.add_input(input[0], x2=input[1]) will produce the following error:
TypeError: forward() got an unexpected keyword argument 'x2'
So one should only provide the data necessary for the forward function of the model.

Before running such a benchmark, one can test an example run with the function:
bench.run_once(*args, **kwargs). After the docstring

Given input id (input_idx) run benchmark once and return prediction.
This is useful for testing that benchmark actually runs the module you
want it to run. input_idx here is an index into inputs array populated
by calling add_input() method.

Here, the documentation seems to be old/wrong, because once again the input is only passed to the forward function. I don't know yet if these discrepancies are caused by using a nn.Model and not a ScriptModule, but I will look into it in a bit.

EDIT:
Added real minimal example.
Plus, nothing changes when using ScriptModule

Here is one example I came up with, which is nothing more than a tiny wrapper around the module.
The only addition I've done is to allow the user to specify an input_transform which would allow them to use their loader of the dataset and select the correct input.


class ThroughputBenchmarkWrapper:
    def __init__(self, model: Union[torch.nn.Module, torch.jit.ScriptModule]):
        self._bench = ThroughputBenchmark(model)

    def run(
        self,
        sample_data_iterator: Iterable,
        input_transform: Callable = lambda input_data: input_data,
        num_calling_threads=1,
        num_warmup_iters=10,
        num_iters=100,
    ):
        for sample_data in sample_data_iterator:
            forward_input = input_transform(sample_data)
            self._bench.add_input(forward_input)

        stats = self._bench.benchmark(
            num_calling_threads=num_calling_threads,
            num_warmup_iters=num_warmup_iters, 
            num_iters=num_iters,
        )

        return stats

    def run_once(self, forward_input):
        return self._bench.run_once(forward_input)

I don't think this should be attached to an engine, as the model should be converted to the ScriptModule for "correct" testing and I think automatic conversion could have some pitfalls.

I would like to add the automatic device mapping similar to create_supervised_* because I think it would be interesting to quickly compare the performance of the current GPU vs CPU configuration.
But that could just be me.

I think, I would like that as a context manager similar to what we decided in #596
This way you would have a benchmark at the beginning and later on you could just do your training in the very same script

Hmm. So now my approach would look something like this:

import torch
from torch.utils import ThroughputBenchmark
from ignite.engine import Events
from typing import Iterable, Union, Callable
import contextlib


class ThroughputBenchmarkWrapper:
    def __init__(
        self, 
        num_calling_threads=1,
        num_warmup_iters=10,
        num_iters=100,
    ):
        self._bench = ThroughputBenchmark(model)
        self._stats = None
        self._num_calling_threads = num_calling_threads
        self._num_warmup_iters = num_warmup_iters
        self._num_iters = num_iters

    def _batch_logger(self, engine, input_transform):
        input_data = input_transform(engine.state.batch)
        self._bench.add_input(input_data)

    def _run(self, engine):
        self._stats = self._bench.benchmark(
            num_calling_threads=self._num_calling_threads, 
            num_warmup_iters=self._num_warmup_iters, 
            num_iters=self._num_iters,
        )

    def _detach(self, engine):
        if engine.has_event_handler(self._batch_logger, Events.ITERATION_STARTED):
            engine.remove_event_handler(self._batch_logger, Events.ITERATION_STARTED)
        if engine.has_event_handler(self._run, Events.COMPLETED):
            engine.remove_event_handler(self._run, Events.COMPLETED)

    @contextlib.contextmanager
    def attach(self, engine, input_transform=lambda input_batch: input_batch[0]):
        if not engine.has_event_handler(self._run):
            engine.add_event_handler(
                Events.ITERATION_STARTED, self._batch_logger, input_transform
            )
            engine.add_event_handler(
                Events.COMPLETED, self._run,
            )

        yield engine
        self._detach(engine)

    @property
    def stats(self):
        if self._stats is None:
            raise RuntimeError(
                "Benchmark wrapper didn't run yet so results can't be retrieved."
            )
        return self._stat

Problems I have with this approach:

  • I have to access the data part of the batch. The user has to make sure that the batch is correctly transformed to the desired data, especially if they use create_supervised*'s prepare_batch. Maybe a note in the docstring would be enough.
  • The user could be inclined to benchmark the complete dataset, which could be too much data. I am thinking of either using an internal counter or using CustomPeriodicEvent once.

You could use an argument max_batches together with a counter for this. And I think a note in docstrings should be sufficient

I've used a custom event_filter (mostly because I just want to get more familiar with the code)

import torch
from torch.utils import ThroughputBenchmark
from ignite.engine import Events, Engine
from typing import Iterable, Union, Callable
import contextlib


class ThroughputBenchmarkWrapper:
    def __init__(
        self, 
        num_calling_threads: int = 1,
        num_warmup_iters: int = 10,
        num_iters: int = 100,
    ):
        self._bench = ThroughputBenchmark(model)
        self._num_calling_threads = num_calling_threads
        self._num_warmup_iters = num_warmup_iters
        self._num_iters = num_iters
        self._stats = None

    def _batch_logger(self, engine: Engine, input_transform: Callable):
        input_data = input_transform(engine.state.batch)
        self._bench.add_input(input_data)

    def _run(self, engine: Engine):
        self._stats = self._bench.benchmark(
            num_calling_threads=self._num_calling_threads, 
            num_warmup_iters=self._num_warmup_iters, 
            num_iters=self._num_iters,
        )

    def _detach(self, engine: Engine):
        if engine.has_event_handler(self._batch_logger, Events.ITERATION_STARTED):
            engine.remove_event_handler(self._batch_logger, Events.ITERATION_STARTED)
        if engine.has_event_handler(self._run, Events.COMPLETED):
            engine.remove_event_handler(self._run, Events.COMPLETED)

    @contextlib.contextmanager
    def attach(self, engine: Engine, max_batches: int = 10, input_transform: Callable = lambda input_batch: input_batch[0]):
        def under_max_batches(engine: Engine, event: Events):
            # Events start with 1
            if event <= max_batches:
                return True
            return False

        if not engine.has_event_handler(self._run):
            engine.add_event_handler(
                Events.ITERATION_STARTED(event_filter=under_max_batches), 
                self._batch_logger, 
                input_transform, 
            )
            engine.add_event_handler(
                Events.COMPLETED, self._run,
            )

        yield engine
        self._detach(engine)

    @property
    def stats(self):
        if self._stats is None:
            raise RuntimeError(
                "Benchmark wrapper hasn't run yet so results can't be retrieved."
            )
        return self._state

I hope it is fine, that I am not including the docstrings and the input tests here, to keep the output short and because we are still discussing the design. (These are my first contributions to OS projects, sorry for asking trivial questions sometimes)

IMO this looks fine. Can you maybe open a PR on that and write tests and a short example for this part? This way it would become easier for us to see, if this works or not and we can comment on more specific parts on the code!

@kai-tub thanks for the code, yes, it looks fine! I agree with Justus, please, open a PR. We have some guidelines on how to contribute: CONTRIBUTING.md. Please, feel free to ask questions if something is not clear about that :)

Was this page helpful?
0 / 5 - 0 ratings

Related issues

vfdev-5 picture vfdev-5  路  3Comments

andreydung picture andreydung  路  4Comments

Aiden-Jeon picture Aiden-Jeon  路  3Comments

CreateRandom picture CreateRandom  路  3Comments

Sudy picture Sudy  路  4Comments