[TL;DR: We're designing a new input pipeline API for TensorFlow, and we'd like to collect your feature requests on this issue.]
We've noticed that one of the biggest challenges in getting started with TensorFlow is how to load your own data into your programs. While TensorFlow has several methods that can be used to build complex input pipelines (such as tf.train.string_input_producer(), tf.train.batch(), etc.), they were designed for a particular use case (processing a static set of files repeatedly), and the average user experience with these methods is not great. For example:
tf.errors.OutOfRangeError).tf.train.start_queue_runners(sess): in fact, they hang indefinitely and deadlock the user program.We're decided to start from a clean slate and redesign the input pipeline API. The existing methods will remain until TF 2.0 (at least), but we are planning to add a new set of methods for loading and manipulating datasets. We're still preparing a detailed design, which we plan to share soon, but we anticipate that there will be two new APIs:
Dataset represents a collection of data elements. Each element can be a tuple of one or more tensors (e.g. an image and its label). We will provide methods for creating datasets from tensors, and deriving them from another dataset (e.g. by slicing its elements, repeating its elements, shuffling its elements, batching its elements, mapping a function over its elements, etc.). Iterator can be created from a Dataset. An iterator represents the current position within a dataset, and exposes an operation (like tf.QueueBase.dequeue()) that can be run to get the next element. There will be explicit operations for initializing an iterator, so that it can be reused after you have processed all of the elements in a dataset.A similar pattern turns up in many different settings, including Java's Stream API, Scala's collections (and hence Spark's RDDs), and .NET's Language Integrated Query.
We're announcing this plan early because we want to collect feedback on what features you—as TensorFlow users—would like to see in an input pipeline API. What other pain points have we missed? What features do you miss from other systems? What other suggestions do you have?
We look forward to hearing from you!
A must-have for one of our use cases is ad-hoc creation of data elements via a callback function (which creates tensors on the fly, e.g. using py_func() or through some other means).
More specifically, we currently have a use case where we employ two queues; an outer one, using a string_input_producer (with shuffling), where each string denotes/points to a "data set", and the inner queue is then produced by generating a variable amount of samples from each "data set". Which and how many samples are generated differs per epoch (potentially conditional on past training behavior). Actually, we don't even use the nomenclature of an epoch anymore, since the same data is never seen twice, and above mentioned generation/sampling goes beyond the usual data augmentation.
Long story short: With a slightly out-of-the-ordinary use case, we've been hit by pretty much all of the problems you have mentioned above, and our workarounds have not been pretty. We'd be extremely happy to see a very flexible mechanism, where such cases are supported, and data generation doesn't have to be shoehorned into forced-upon concepts like epochs, finitely repeating queues, etc. (although they can be modeled by its primitives).
I am not sure how well the planned Dataset/Iterator API would support this.
Edit: Things we still need of course, include multi-threaded data generation, and multi-threaded random shuffle producer-consumer queues. But without the bane of GIL -- maybe via easy C++/Eigen hooks and thread control on the native side? Back and forth, via pybind?
Edit2: The new input pipeline should also take support for variable-sized tensors (i.e. different per example) into account, for both training and inference, e.g. in a fully-convolutional setting.
@kmhofmann We'll certainly support tf.py_func() inside a new-style input pipeline (as well as, in general, compositions of any other TensorFlow ops). I'd like to understand more about your use case, though. How frequently do you move from one outer "data set" to the next? Are there any specific operations that you perform at the end of a "data set" or can your training loop handle the concatenation of records from different "data sets"?
We're planning to have a few nested iteration primitives, so that you can write a function mapping an element (e.g. a string representing your outer "data set") to a Dataset (representing the records in that "data set") and then flattening them down to a single Dataset. (Think SelectMany() in C#, flatMap() in Java and Scala.) So I think you could implement your logic for sampling from a "data set" in one of these flatMap() functions.
Let me know if any of this is unclear!
Oh good timing! Now I can stop writing my own (horrible) dataset class. Many of the things said already resonate with my experience.
To the extent possible, I would like to code dataset-independent tensorflow computations. I don't want to have 3 different gan classes: each with their own create graph and fit methods, simply because one dataset doesn't fit in memory, the other is an np.array, and the other is generated on the fly.
The use case that affects me the most is [do n times: train for k iter/epoch, validate model, repeat]. There are clear problems with queues like you said. A minor issue for which I offer no solution is that while the scheduling(how long to train for before validate) is done perhaps by some method of a model class that I would like to be dataset-independent, whether it makes sense to talk in terms of iter or epoch is determined by the dataset--ruining some of the independence.
Some other ideas I jotted down while brainstorming my own class:
session.run(enqueue_op, feed_dict=$some_numpy_batch_input) Where you can asynchronously feed the queue from python.Good point. One thing is that currently the queue operations are "baked in" the computation graph, so it's hard to modify anything on the go. A higher abstraction can make it much easier without considering using control flows or other hacks.
For a lot of my use cases, my input data is either 1. not on the file system, or 2. require complex preprocessing unavailable in TensorFlow. For both cases the existing input pipeline cannot help, so I use an input thread with enqueue/feed_dict + a training thread with dequeue a lot.
Let's assume that in most cases, you don't need to use the model itself to produce data (though sometimes it's not true). Then a solution I really like to see, is to be able to receive(similar to dequeue) tensors from a different process. (Like #4836)
The benefits are:
These are the features I really missed from a private system I've been using.
One disadvantage is that IPC/socket has smaller bandwidth than RAM but usually it's not a bottleneck.
I know this feature may be too far away, but I hope the new design could allow such possible future feature.
@mrry One "data set" can be composed of anything between ~500-30,000 dynamically generated samples. At the moment, we don't perform specific operations at the end of each data set, i.e. everything gets put into the same (large) random shuffle queue, to mix samples between data sets. But I could also imagine cases where separation of sets might be helpful.
Please support reading hdf5 file directly.
Personally, I'm a very big fan of the feed_dict method of feeding data into the graph. It is by far the most flexible, makes debugging way easier and makes for much simpler code. Thus my biggest wish would be to make that method more performant. Right now, this method starves my GPU all the time, which is a shame because most other DL frameworks (even those based on computational graphs) manage to make this much more performantly. I assume there is more copying/handling going on in the background than would be necessary.
I am glad to see this initiative. The input pipeline is definitely the
steepest part of the learning curve.
I'd like:
feed_dict) dataset and large one so that the same code scale and your model only have to talk to one API. Although, I have not use it yet, I liked what I read in the input pipeline documentation.__len__ are great for progress report.Dataset class because, IMHO, the "dataset" concept is ill-defined. The Dataset class described in the original post already exists in Python: it is a list of tuples. And what is a "dataset", anyway? A collection of train/valid/test data or simply a collection of data? Is it just a file? directory? generator? Are each data item (input/target) couple? Is that always true? Is the dictionary part of the text dataset?Dataset container, I would prefer to have a rich set of containers offering different trade-off with respect to memory/time complexity. In addition, I would like to have a rich set of iterators, splitters, loaders, dumpers, slicers, repeaters, servers, generators to actually work with data coming from various source.epoch = global_step / steps_per_epoch and steps_per_epoch = dataset_size / batch_size.Here my attempt to translate to small in-memory dataset some of the routines available in the TF's input pipeline for large dataset. Here some examples of what I would like to see available in TensorFlow:
These routines demonstrate how far you can go with just simple iterators over list of indices.
+1 to something like feed_dict. That's the only way to learn by interacting with external world (training robot arms, Atari games, Universe ).
It could be made more efficient by avoiding copies. Like PyTorch, whose Tensors share memory buffers with underlying numpy arrays
I don't know TF as well as others here, so please take my comments with some skepticism:
With tf.py_func I was able to solve most of my input-related problems, like loading .mat files in a symbolic-ish manner. The one I'm currently struggling with is the integration of tf.train.batch with the ability of picking the source from which the input should come, for having train/val data in the same symbolic variable. #8168
I understand these functions were initially thought for simple use cases, but it would be nice to have more control of the pipeline without the burden of managing _everything_ (e.g. using tf.QueueBase.from_list but being forced to feed queues and manage threads kind of manually).
I'm not sure if TensorFlow optimizes the dequeue operation under the hood but, if not, I think we could greatly benefit from a parallel dequeue operation that charges data (i.e. next batch) into the GPU memory while it processes the previous data (i.e. current batch).
I think feed_dict-like solutions are not optimal for passing big chunks of data to the train function, like a batch of images, since they're basically a pause in the execution graph to force TF to interact with _python's runtime_. An in-graph solution sounds better, with pointers to guide the graph execution, like feed_dict={is_training = True} to indicate the input should come from the training pipeline, the model should set batchnorm and dropout (et al) to train mode etc. This way, TF could better optimize/parallelize the execution, and all solutions would scale.
The standard functions for creating batches apparently do not provide an index to indicate which batch we are processing. For example, a slice_input_producer receives the number of epochs to be generated but there seems to be no way of knowing the epoch of one sample without counting how many we have already evaluated.
right now there are two very divergent paths to getting data into Tensorflow: feed_dict and queues. queues are wonderful until you don't have a way to manipulate your data natively -- for example, if you want to load a .wav file, chop it into parts, and convert it to a spectrogram. at that point, you have to write a C++ op (doable, but a context switch + it makes a very inflexible pipeline) or pop back into Python land (slower, but very easy and flexible).
it seems like the best compromise between speed and flexibility is to create a TF queue and then make a bunch of Python threads that feed it with data. this allows you to do flexible data processing in Python (roughly parallelized on the CPU, apart from GIL issues) while maintaining some amount of speed benefit.
what if you just formalized that? the interface would be: push_data, end_of_data (for signaling the end of an epoch), and a dequeue_batch function that feeds the model. then your code could just load data in Python and stuff it onto the queue in parallel, while the model sits totally separate from all of that.
We should make feed_dict faster (likely by not copying the numpy.arrays like @yaroslavvb mentioned), but that's orthogonal to this change. No matter how much we optimize it, feed_dict will never be the fastest way to feed data into a training job.
feed_dict specifically may not be essential. To be more precise, we need support for pipelines where learning is done in an online fashion, and training data is generated by a system responding to actions of a TensorFlow network (learning Atari simulator, robotics simulator, robot interacting with real world, etc). This is necessary for most of the applications at OpenAI, here's one example -- https://github.com/openai/universe-starter-agent
The fastest option would be to create a TensorFlow op that maintains state, takes actions as input, and generates the training data. Then add a placeholder to specify the action.
My guess is that you're looking for something that can be done completely in Python, though. There may be some mid-point between the two.
I am not sure it this concept has been brought up yet, but I will at least put the problem in my own terms.
In dealing with RL problems and the training replay buffer, I couldn't find an easy way to use the Queues to speed up this feeding of samples through the feed_dict. Also, when randomly creating a sample set, it seemed like the samples were consumed when I wanted them left in the buffer.
What I was hoping to do is feed (possibly through feed_dict, or file) a Queue with a new sample and once the size of the buffer is exceeded, the oldest sample is removed from the buffer. So some concept of "sample age" would be nice. I am sure using a circular buffer will work to fix to a number of samples, but "age" might be of interest as well, maybe passed as part of the tuple, but in the RL case, simply the sequence of the sample being added might cover the age (FIFO).
Again, it may have just not been clear to me how to use the queues, but being able to randomly pull a mini-batch from this sample buffer and not remove the samples so a new set of samples can be collected (possibly with prior sampled examples) would be nice.
I may not understand the distributed settings that TF data input pipeline API is targeting to solve. Is it possible to have a simple API design as Pytorch does: only three simple classes. I can pick up pytorch's dataset API in 5 minutes and it's good enough for all the popular academic datasets. http://pytorch.org/docs/data.html
It's great to see new efforts to solve the pain points in TF dataset API. Looking forward to a simple/beautiful/flexible API with minimum number of classes/concepts introduced. Thanks.
@lming Yeah, the first two comments here cover that: by making a Dataset implementation that uses py_func, it'd be equivalent to the PyTorch implementation.
I second @lming's sentiment above.
Our biggest issue with the current data loading scheme is just that it's very complicated and involves a lot of new concepts.
We don't find it spectacularly difficult to write a multithreaded data loader ourselves in Python, and generally we don't find it overly difficult to ensure that our data loading and preprocessing runs sufficiently quickly that it doesn't actually bottleneck training.
Where we're stuck is that to optimally follow recommendations, we end up in an awkward situation, one of:
feed_dict and suffering any relevant performance hitspy_func, but still using the TF API for managing queue runnersThe Python threading API isn't perfect, but in general when we're doing mostly non-GIL-taking tasks in NumPy or whatever, the TF queue API seems more of a burden than a help.
A couple of concrete use cases that come up for us:
AffineTransform objects to apply cropping and resizing operations for this model, because those objects let us easily translate our model output back to the original input coordinate space. It seems tricky to square this with a fully tensor-based API.Iterator, which seems less straightforward.We primarily deal with time series data, and prefer to not have to batch preprocess the whole dataset prior to training every unique model input architecture. In fact, the preprocessed dataset size for one input architecture variant can be easily an order of magnitude larger than the unprocessed file set size.
We've been able to deal with this using the current system of queues,TF ops, batch_join/etc to enable multiple on-the-fly preprocessing threads with across-file example mixing. I have to say it's really nice and flexible for hyperparameter tuning the input architecture, and I like that the entire pipeline lives in the graph, feeds data in response to sess.run(train_op) calls, and can be restored from a common checkpoint with the model.
If you're planning to deprecate the current queues paradigm, I would like to know that the Dataset and Iterator would enable the same flexibility. For my use case it seems like Dataset could represent a collection of time series, and the Iterator would behave like a python iterator/generator and could handle any preprocessing to form batches of examples?
feature request: control mechanism for queues, especially in combination with TFRecordReader's/TextFileReader's read() method, which automatically dequeues. Reason: the automatic preevaluation of pending enqueues.
MXNet IIterator is a more relevant example than Java's Stream API, Scala's collections (and hence Spark's RDDs), and .NET's Language Integrated Query. The design enables flexible composition of various components of the input pipeline such as ImageRecordIter, ImageNormalizeIter, BatchLoader, PrefetcherIter and ImageAugmenter.
MXNET_REGISTER_IO_ITER(ImageRecordIter)
.describe("Create iterator for dataset packed in recordio.")
.add_arguments(ImageRecParserParam::__FIELDS__())
.add_arguments(ImageRecordParam::__FIELDS__())
.add_arguments(BatchParam::__FIELDS__())
.add_arguments(PrefetcherParam::__FIELDS__())
.add_arguments(ListDefaultAugParams())
.add_arguments(ImageNormalizeParam::__FIELDS__())
.set_body([]() {
return new PrefetcherIter(
new BatchLoader(
new ImageNormalizeIter(
new ImageRecordIter<real_t>())));
});
Caffe DB is simpler but still usable.
Ideally, the newly designed API should be able to load existing datasets of Caffe & MXNet with easy to implement plugins.
Just my 2 cents. Happy for this decision. I think that a huge effort should be placed in tutorials: the hugest difficulty I am having -- and some colleagues with me -- is that the documentation that you can find is quite lousy and not very self-contained. I would be happy to help, of course.
A Dataset represents a collection of data elements.
I'm not sure if this is implied but a requirement would be to stream large datasets from disk. A Dataset explicitly containing all data elements doesn't seem to support that.
In the last release notes I see that you have added a new RecordInput class, which seems to be the new class intended to use as input provider? Unfortunately the documentation is still lacking further explanations. I can only find some basic infos in the C++ API docs.
Would be really interested to read something for the Python API + some example code. If you need any help, feel free to contact me or e.g. @petrux also offered help. I think he is right, that extending the documentation and providing better tutorials is highly important. Because otherwise the people will stick with feed_dict inputs until TensorFlow 3.0 and moan about bad performance of TF
I ended up doing some benchmarking for other reasons, and observed comparable performance between feed_dict and using queues: https://github.com/tensorflow/tensorflow/issues/9322#issuecomment-295775991
Feed dict overhead is essentially the cost of doing an extra memcpy (Python->TensorFlow CPU->TensorFlow GPU) vs. using native ops like queues which do (TensorFlow CPU->TensorFlow GPU). So if this memcpy is small, there should be negligible.
That makes sense, and it's what I was assuming.
I think that makes the advice against feed_dict a bit overblown, though – really the issue seems more like inefficient data feeding that starves the GPUs, rather than the use of feed_dict itself.
@yaroslavvb correct me if I'm wrong but this isn't entirely right. Unless you haven't implemented some input pipeline using Pythons Queue library or something similar it will be additionally the time of loading data from disk into memory and eventually preprocess them.
For Images and especially larger batch size, this might take quite a while. Here is where you can really speed things up using TFs input queues because they will load e.g. images into disk ( + preprocess ) on CPU while you are training/evaluating your network on GPU. When computations are done, you can directly grab the next batch on copy the data on the GPU, without waiting for native Python to load new data into memory.
@kratzert, that's precisely what @taion means by
the issue seems more like inefficient data feeding that starves the GPUs
It's hard to do asynchronous preprocessing well, so most users benefit from tf.train doing it for them.
Is it? By no means I want to defend TFs input queues, but as I read the post of @yaroslavvb he states that the additional time comes (only) from passing memory between native Python and TF ( + GPU). The only thing I wanted to add is, that if you can't store all your trainings data in memory, also loading memory from disk into memory adds up time to one training cycle. I can't find any of this statement in the post of @taion, but could be a misunderstanding of my side as I'm not a English native speaker.
And I know, that for many cases asynchronous preprocessing is not possible, but for the cases it is (simple training of image classification CNN) TFs input queues help quite a lot.
The only thing I wanted to add is, that if you can't store all your trainings data in memory, also loading memory from disk into memory adds up time to one training cycle.
That also applies to tf.train. Data doesn't just appear in the graph without reading it from disk, regardless of whether you do it via Python or TensorFlow's execution engine.
Anyway, I find memapped NumPy arrays via joblib.cache.Memory and feed_dict to be quite performant (GPU load over 90% throughout a training session), despite the extra copy.
Ah okay so maybe a misunderstanding. Thought I made myself clearer.
I know that data does not appear magically in memory if I use TF. But TF makes it quite easy to place e.g. image loading + preprocessing explicitly on CPU and graph computation on GPU and both is done in parallel. So while GPU calculates ops on some data, CPU is already loading the next data into memory. And since this is done in parallel, we are effectively reducing the computational time by the amount of loading data from disk (since this takes usually less time then one forward + backward pass through the networks graph). But yes this only applies for working with CPU + GPU and has no effect if you use CPU only.
edit: The maybe only thing I like of TFs input queues is that I the state of the queues (and batch producers) can be observed in Tensorboard. For the rest I fought quite some time to get them running with all the preprocessing I wanted and with queues for testing and validation in the same run.
I would say that aside from the steep learning curve of input pipeline which can be overcome with documentation too, the key missing points are:
input_map provides such functionality, but it is rather hacky. And documented too (when it comes to import_meta_graph).I'd support @nicolasdespres for "no Dataset" pledge, mainly because all-in-one bundle is not flexible, not future proof and also - not consistent with the TF's paradigm of providing small, stable, well-defined and assemble-able blocks for building custom models. Having some bundles, predefined "easy-starter" wrappers should be welcomed.
I think TF does not really require another attempt to unify the data pre-processing to put it directly into the graph. Things get worse if one need custom stuff and on-the-fly generation/modification of data. Typically these modifications are not part of the forward model for a good reason: these operations do not require any backpropagation. Hence, they should be only loosely coupled.
So the ideal input pipeline (everything without backprop) should be quite simple and slim: It should consist of a queue operation which receives data (list of tensors) from some source (sockets). #8728 is a good step in this direction with the pros:
I am not sure, if you really need something else and I do not understand why you really need tf.image.random_* in the graph.
I don't think the proposal here is to get rid of queues entirely, is it?
Dataset-style abstractions are pretty common in this space, and they're quite useful. The existence of a higher-level abstraction doesn't preclude the lower-level API from also existing.
In fact for these kinds of higher-level abstractions, sooner seems better than later – one of the greatest frustrations of reading published TF research code is that the vast majority of codebases use their own idiosyncratic layers library, as opposed to e.g. the ones in tf.layers or tf.contrib.layers, and these libraries are all different, which makes it more difficult than it should be to share work.
I often try to use TensorFlow on very large inputs (potentially >1GB minibatch) with relatively light computation on each minibatch. These inputs are in a HDF5 file or a Numpy array either on disk or in memory, so I typically feed with feed_dict, potentially asynchronously into a queue. When running with multiple GPUs, TensorFlow is not able to even saturate the PCI-e bandwidth to the GPUs because of the memcpy from the feed_dict to the CPU tensor.
As @yaroslavvb mentioned, the feed_dict memcpy (on a single CPU core?) can be a huge performance bottleneck, and I'd like to see this addressed in any refactor of TensorFlow's input handling.
@jhseu You mentioned that you consider removing the feed_dict copy as orthogonal to this issue. Do you know if there's any issue or work being done on removing the copy (at least in some cases, like row-major Numpy arrays with nice strides)?
@eamartin there have been a number of changes since the beginning of March by @alextp to speed up feed_dict; when the memory is aligned with 16 bytes, I think we share buffers with numpy, so nightly releases may be faster for you.
The 16 byte alignment issue comes from Eigen, unfortunately, which requires the beginning of the memory addresses to be aligned with 16 bytes. I'm not sure why Eigen was not written to handle unaligned first and last "packets" so it wouldn't matter. :/
Would it be possible for numpy to share buffers with tensorflow variables
when they are returned from a session run?
I realise this will probably raise all sorts of mutability and state
issues, but these should be avoidable by setting the WRITABLE flag on the
returned numpy arrays to false.
On 29 Apr 2017 1:37 am, "Vijay Vasudevan" notifications@github.com wrote:
@eamartin https://github.com/eamartin there have been a number of changes
since the beginning of March by @alextp https://github.com/alextp to
speed up feed_dict; when the memory is aligned with 16 bytes, I think we
share buffers with numpy, so nightly releases may be faster for you.
The 16 byte alignment issue comes from Eigen, unfortunately, which requires
the beginning of the memory addresses to be aligned with 16 bytes. I'm not
sure why Eigen was not written to pad the first and last "packets" so it
wouldn't matter. :/
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
https://github.com/tensorflow/tensorflow/issues/7951#issuecomment-298129805,
or mute the thread
https://github.com/notifications/unsubscribe-auth/ADXd5AD4yuTBPXo8pp-uvVbr9QyYnJhDks5r0nhMgaJpZM4MO9tN
.
Thanks for the info @vrv (and for the features @alextp ). I did a little looking around, and it looks like https://github.com/tensorflow/tensorflow/commits?author=alextp&since=2017-03-01T06:00:00Z&until=2017-04-01T05:00:00Z are the relevant commits. From my checking, these didn't make it into TF 1.1 but hopefully will be in 1.2.
Another thing that would be cool would be the ability for Session's to return Futures that could then be used as input to other Session runs. Said future could then be passed through the graph until the Tensor it represents requires evaluation.
with tf.Session() as S:
# Note executor semantics
future = S.submit(op_1, feed_dict={'input_1': 1.0, 'input_2':2.0})
result = S.run(op_2, feed_dict={'data': future})
This concept is inspired by dask distributed and other executor frameworks -- I think the flexibility offered by this abstraction is great!
@sjperkins sadly the way our current unit tests are written makes them break if variables are returned without extra copies (because many tests do a = session.run(variable); session.run(update_variable); b = session.run(variable); assertDifferent(a, b) (which fails if they share buffers).
I considered making a ConfigProto option to share buffers even when they are not exclusively owned by the C-python bridge but didn't. Should be easy to do from the commits listed above, if you're interested.
@sjperkins for the futures, stay tuned, as we're prototyping something in this direction.
My primary request is that however you build the new input pipeline system that it should be completely separate from the rest of the graph. I'm giving a shot at migrating to tensorflow for our deep learning models for devices; but the input pipeline is so tightly bound to the rest of the compute graph that its like performing surgery to run inference against it.
Example: I trained using TFRecords and input queues; I got my weights/model. I want to perform inference by running my prediction operation; but because the input queue runners etc are part of the graph before that; I am stuck with that mechanism for performing inference.
See issue here: http://stackoverflow.com/questions/43708616/tensorflow-inference
I like the tf record and queue runner thing now that I'm used to it; the issue is the tight binding to the graph....
This is why tools like tf.estimator.Estimator were developed, to allow for
easier separation of concerns between training and inference and to allow
for swapping of input pipelines. Can you use Estimator to write your model?
On Mon, May 1, 2017 at 2:54 PM, David Crook notifications@github.com
wrote:
My primary request is that however you build the new input pipeline system
that it should be completely separate from the rest of the graph. I'm
giving a shot at migrating to tensorflow for our deep learning models for
devices; but the input pipeline is so tightly bound to the rest of the
compute graph that its like performing surgery to run inference against it.Example: I trained using TFRecords and input queues; I got my
weights/model. I want to perform inference by running my prediction
operation; but because the input queue runners etc are part of the graph
before that; I am stuck with that mechanism for performing inference.See issue here: http://stackoverflow.com/questions/43708616/tensorflow-
inferenceI like the tf record and queue runner thing now that I'm used to it; the
issue is the tight binding to the graph....—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/tensorflow/tensorflow/issues/7951#issuecomment-298442742,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAATxXYK7comPG84UqyKek2mbzhnifD4ks5r1lSWgaJpZM4MO9tN
.
--
Currently I am worried that my training/prediction preprocessing will diverge over time.
I would be interested in a pipeline where:
tf.py_func, but able to provide implementation at runtime)The preprocessing and post processing do not require backprop, but they sill need to carry some values with them (normalization divisors or one hot mappings). It would be ideal to have some Op that can train some values like this during training, carry them over to production within the serialized graph and then I could provide an implementation for this pre/post processing Op in the target production language.
@mirosval, it seems to me that this might be what tf.Estimator is intended for eventually.
Not sure if this was mentioned above and I missed it, but I would appreciate a much easier way to switch between train and validation data sets. When using feed_dict, this is very simple, and this is what I'm used to. I've recently been trying to make the switch from feed_dict, but this has been (at least in my limited experience) a major difficulty. The tutorial page here mainly just suggests using separate processes, but this can be a pain, especially if I want to do early stopping based on the validation data. If I could create some sort of input method (Queue, Dataset, whatever) where I can cleanly swap between training and validation inputs, that would be much nicer (again, feed_dict is great for this, but if it will always be slower, it'd be nice to have a more performant alternative).
@neighthan: yes this is something that @mrry has planned as well :)
@neighthan Using current constructs, we've constructed a SwitchableDataSet that allows you to at run time switch between train, validation, and test.
@kdavis-mozilla that sounds interesting, but the link appears to be dead. Is there another reference?
@neighthan Sorry, fixed.
Very promising! That is one of my biggest issues with the current API.
That said, looking at SwitchableDataSet, it seems like implementing new kinds of data feeding use cases will be mostly done by implementing use-case specific classes. Will the new programming model also feature an API to, say, implement what the SwitchableDataSet offers and beyond from more generic, lower-level primitives? I'm just wondering about what things users will come up with (w.r.t. data generation and usage) that would otherwise require (specific) additions to the API...
Am I correct to assume that https://github.com/tensorflow/tensorflow/tree/master/tensorflow/contrib/learn/python/learn/dataframe is part of the new input pipeline initiative? Is there any code out there that uses this, even if that code is undocumented? I'd like to see an example of it in use.
I also assume that the dataframes and transforms are intended to be closely integrated with estimators? At the moment, I don't see how estimators fit (nicely) with the dataframes and transforms. I see where you have ways to generate feed_fns from dataframes for estimators, but it seems more like an adapter to another approach, instead of part of the pipeline design.
I understand this is all very new and under development. I really like what I see! Keep up the good work.
@vonclites That's for interacting with Pandas DataFrame/Series, not the new Dataset construct.
@jimfleming It seems to be more general than that. There are methods to create TensorFlowDataFrames from csv files, dicts, numpy arrays, TFRecords, as well as from pandas. It follows the nomenclature of pandas, but it also resembles Spark's pipeline, as @mrry mentioned, and has many of the features he described in the original post.
I agree that it's fairly generalized and it may be the basis for future
work but this has been available for quite a while and most of the files
haven't been updated in months.
On Sat, May 13, 2017 at 9:46 AM vonclites notifications@github.com wrote:
@jimfleming https://github.com/jimfleming It seems to be more general
than that. There are methods to create TensorFlowDataFrames from csv files,
dicts, numpy arrays, TFRecords, as well as from pandas. It follows the
nomenclature of pandas, but it also resembles Spark's pipeline, as @mrry
https://github.com/mrry mentioned, and has many of the features he
described in the original post.—
You are receiving this because you were mentioned.Reply to this email directly, view it on GitHub
https://github.com/tensorflow/tensorflow/issues/7951#issuecomment-301259803,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAN0-Uztf2Y5vav67n_-YYSpISvvTEOBks5r5d5ngaJpZM4MO9tN
.
@jimfleming Good point
First documentation on master under tf.contrib.data: https://github.com/tensorflow/tensorflow/tree/master/tensorflow/contrib/data
All this seems pretty amazing !
Any planing for supporting a seek-able file format, which can be started to read from an arbitrary offset?
Below is why we need it:
Assume there is a large training data set which is in text format, and we need to convert it into tfrecord format. Then we started a map-reduce job, converted it into 10 tfrecord files, started 10 workers to read them, Perfect! Then we want to run it faster, we would change the worker count to 20, 30, 40, ... it would be great if we could do this without re-generate the training data.
Solution:
First, we need to pass an offset and length together with the filename into Dataset 's constructor. Only filename is not enough.
Second, the file format it self must be seekable(aka. splitable). it should be one of the following:
This new API is faster than the old one. I've integrated this new dataset API into my factoration machine trainer, and it saved me 20% training time. Thanks @mrry
I feel like the default code for looping over a dataset is a bit ugly with an exception breaking out of a while True loop:
sess.run(iterator.initializer)
while True:
try:
sess.run(train_op)
except tf.errors.OutOfRangeError:
break
Wouldn't there be a way to have an is_empty tensor indicating when the iterator is empty? Like:
sess.run(iterator.initializer)
is_empty = False
while not is_empty:
_, is_empty = sess.run([train_op, iterator.is_empty])
If you're using a MonitoredSession and its variants you should still be able to do this:
with tf.train.SingularMonitoredSession() as sess:
while not sess.should_stop():
sess.run(train_op)
Edit: Sorry, it totally works. I just realized it exits the session context entirely, but the program keeps running. The .end() calls on hooks work as well. @jimfleming you're right again ;)
I'm not having success with the monitored session suggestion. The program simply exits without even reporting the exception.
with tf.train.MonitoredTrainingSession() as sess:
while not sess.should_stop():
data = sess.run(next_batch)
print('stopped')
the above code prints nothing, just exits after attempting to run past the last batch. However, the following will print "hit exception" forever, unless break is used.
with tf.train.MonitoredTrainingSession() as sess:
while not sess.should_stop():
try:
data = sess.run(next_batch)
except tf.errors.OutOfRangeError:
print('hit exception')
print('stopped')
Not sure if I should make a feature request or bug report?
Do the High Performance Benchmarks example recommendations still apply now that the new Dataset API exists?
For instance in the benchmark RecordInput is split into minibatches, which I tried to incorporate via #10143 just before I found this. If those recommendations are all good ones, the datasets API might benefit from the addition of those very same recommendations.
Might I suggest updating the benchmarks in accordance with this API and vice versa?
I've had my head down for a while, so there's lots to respond to here:
@sjperkins: Adding Dataset.chain() and Dataset.product() iterators shouldn't be too hard. I'd like to understand your use case a little better. Do you imagine that most uses will combine exactly two datasets (and hence we might use method chaining to combine them, e.g. ds1.chain(ds2), ds1.product(ds2)) or will it be more common to combine more datasets (and hence we'd take a similar approach to Dataset.zip(), e.g. Dataset.chain([ds1, ds2]), Dataset.product([ds1, ds2]))? Also note that, if you need product() in the short term, I think you can write ds1.flat_map(lambda x: tf.contrib.data.Dataset.zip((tf.contrib.data.Dataset.from_tensors(x).repeat(), ds2))). You could also fake out chain() with Dataset.flat_map() and tf.cond() but that would be quite ugly :).
@snnn: Thanks for kicking the tires! It's great to hear that the new API brought a speedup for your code... we've definitely favored flexibility over performance with the initial version of the API, but look out for improvements over the coming versions.
The idea of supporting seekable file formats is very appealing, and we're figuring out a good API for that. Right now you can use Dataset.skip() and Dataset.take() to select a sub-dataset from the files, but it is not very efficient, because they materialize the skipped-over inputs before discarding them. I could imagine adding an Iterator::Seek(size_t n) method internally, which would allow iterators to specialize their behavior in this case (or fall back to using GetNext() in a loop). This also seems like it would be important for checkpointing iterators, which might be useful for fault tolerance.
@omoindrot: I agree that the try-except construction is pretty ugly, and we should try to find ways to improve it. The current version is designed to be a drop-in replacement for the queues, which use tf.errors.OutOfRangeError to signal completion, and various other classes are designed to catch that exception. Exposing an iterator.is_empty property would be possible, but it would be tricky to make it work in the way you suggest, because (to avoid an exception being raised) you'd need to guard the training subgraph with a tf.cond(iterator.is_empty, make_train_op, lambda: tf.no_op()). Another possibility would be to change the Python API for iterators to use two ops: Iterator.move_next() and Iterator.get_current() (e.g. like the C++ IEnumerator protocol), but that would introduce an additional sess.run() call, and make it harder to share the iterator between threads.
One possibility I've considered is to create a wrapper that turns an Iterator-consuming step into a Python iterator. e.g. Some straw-man code:
def iterate_step(sess, fetches):
cached_step = sess.make_callable(fetches)
try:
while True:
yield cached_step.run()
except tf.errors.OutOfRangeError:
pass
# ...
for _, step, loss in iterate_step(sess, [train_op, global_step, loss]):
if step % 100 == 0:
# Run periodic eval, e.g.
@jimfleming and @vonclites: Thanks for looking into the MonitoredSession integration. It's great to hear that it "just works"! I think we'll still need to do something better for the more advanced cases where we might want to reinitialize an iterator in the same session.
@ahundt: From our initial experiments, the peak performance of the benchmark input pipeline is still slightly higher than what you get from using the tf.contrib.data API. However, the peak performance is much higher than the throughput of actually using the data to train a model like Inception or ResNet, so you might not notice the difference in regular use. We're investigating how to close the gap, and it's very likely that we'll incorporate some of the ideas from the benchmark code into the Dataset implementation.
In particular, one current limitation of the Dataset and Iterator implementation is that the entire pipeline runs on a single device, whereas the more explicit code in the benchmarks is able to split the processing across multiple CPU and GPU devices. For the best performance, it's going to be important to integrate optimizations like the StagingArea for prefetching data to the GPU before it is needed, and we're working on a way to do that more transparently. For now, you can manually pipeline the output of an Iterator.get_next() op with the StagingArea.put() op in a similar manner to the benchmark code.
This is fantastic! I'm using it now and loving it. @vrv @mrry is the ability to swap between train and validation datasets already there or is that still coming? I see the reinitializable iterators give us the ability to use the same iterator with multiple datasets but each time you run an init op, it essentially starts over on that dataset. I'm currently using the new Dataset and Iterator apis along with tf.cond to accomplish this but is there a more direct/natural way in the works?
Hi @mrry
The Iterator API doesn't have a read_up_to/enqueue_many like interface, Will you add it?
@sjperkins: Adding Dataset.chain() and Dataset.product() iterators shouldn't be too hard. I'd like to understand your use case a little better. Do you imagine that most uses will combine exactly two datasets (and hence we might use method chaining to combine them, e.g. ds1.chain(ds2), ds1.product(ds2)) or will it be more common to combine more datasets (and hence we'd take a similar approach to Dataset.zip(), e.g. Dataset.chain([ds1, ds2]), Dataset.product([ds1, ds2]))?
@mrry The approach similar to Dataset.zip() please, for the flexiblity. Thanks for the workarounds. :-)
Wanted to let you know I migrated from queues to Dataset and it's been great. Definitely going in the right direction. There's been a few things that are currently missing and I had to work around:
tf.batch supports SparseTensor and automatic batching of SparseTensor would make my life a whole lot easierAdditionally, I had this idea where you could maybe implement a random test/train split functionality right into Dataset. This could make things easier too. Something like
dataset_train, dataset_test = dataset.split_train_test(test_ratio=0.2, seed=1234)
Keep up the good work!
@lhlmgr The way i understand that example is that iterator needs to be reinitialized every time you switch between train and validation. This isn't the end of the world but for large datasets where we are shuffling minibatches, we want a reasonable buffer_size which means each initialization is quite slow. I find the tf.cond approach with two separate datasets/iterators to work better/faster in that case. That way we can periodically run through validation data without losing our place int the training set.
Maybe here is a good place to refer to my Dataset related questions:
https://stackoverflow.com/questions/44132579/feed-data-into-a-tf-contrib-data-dataset-like-a-queue
https://stackoverflow.com/questions/44132307/tf-contrib-data-dataset-repeat-with-shuffle-notice-epoch-end-mixed-epochs
I really like the new Dataset/Iterator API! Here is a feature that would help my use-case:
I would like to be able to create iterators that share part of the data pipeline. As a simple example, something like this:
a = np.arange(12)
data = tf.contrib.data.Dataset.from_tensor_slices(a)
data = data.shuffle(12)
data0 = data.map(my_func0)
data1 = data.map(my_func1)
iter0 = data0.make_one_shot_iterator()
iter1 = data1.make_one_shot_iterator()
op0 = iter0.get_next()
op1 = iter1.get_next()
What I'd like is for op0 and op1 there to output elements in the same order (because they share the shuffle step), but with different functions (my_func0/my_func1) applied. That is, I would like to create input pipelines that share some processing, and then diverge at some point for additional processing.
What @drasmuss suggests is very useful for segmentation tasks where both labels and images need to be augmented. For example the images could very reasonably use bilinear interpolation, but interpolating label values is not okay because a label pixel boundary of 0 and 2 should not be interpolated to the completely different label of 1.
I am only starting to read into the new API, but I want to share two problems that I had with the old Input Queues in concurrence with using MonitoredSession with SessionRunHooks.
We also used two separate queues, one handling input data_files as string names and the other one the resulting input data with preprocessing being done in between those two.
We needed to make sure that the enqueue operations fills at least a certain multiple of the batch size into the first queue for our code to run without problems (otherwise the second input queue stalled)
Looking something like this
queue_size_train = sess.run([ipc.train.rsq_pre_size)
while queue_size_train <= batch_size * 5:
sess.run([ipc.train.rsq_pre_enq],
feed_dict={ipc.train.ph_in: dataset.train.inputs,
ipc.train.ph_tgt: dataset.train.targets})
queue_size_train = sess.run([ipc.train.rsq_pre_size])[0]
Now when I switched from a normal Session to using MonitoredSession and added a logging hook and told it to log the 'accuracy' tensor, it tried in vain to evaluate the first session run call as the hook had added that tensor to the fetch list, but with the queue being still empty there was no way to evaluate accuracy yet.
Problematic was that the program just stopped and waited for some process to begin filling the queue, but there was non, so it just did nothing, but also didn't throw an exception or give any kind of warning, which made understanding what was happening a bit difficult.
The problem was eventually easily solved just executing one enqueue operation as the very first sess.run() call within the with MonitoredSession(...) as sess: block, but it would be nice if e.g. the input queues could be pre-filled with some initial values upon creation, so that this issue doesn't arise.
Aside from that, we use two different input pipelines for training and validation data that we connect to the network part of our graph alternating through a switch implemented through tf.QueueBase.from_list.
Now in the with ... as sess: block you can easily implement an if block which pipeline to choose based on global_step % interval == 0 but that means copy pasting the same code (sess.run() and add_summary() calls) times the number of different input pipelines you use (e.g. train, validation_1, validation_2, ...)
It would be nice to integrate this directly into MonitoredSession using Hooks somehow (i.e. every 10th step create and save summaries using validation_1 input pipeline, every 25th step ...)
Thanks for the hard work. I'm digging the new API.
In the last couple of days I was playing around quite a bit with the new Input API and I think the Dataset and Iterator classes improve highly the clearness and readability of code (compared to the old input queues). Also switching between e.g. training and validation dataset within one session works quite effortless.
But I have also some questions and suggestions.
Maybe first a question: Is the Dataset class implemented based on queues? Because from the post here it doesn't get clear to me if or if not. In Tensorboard there is no additional information added with the new API about the status of any queue (how many objects are currently queued). Also observing my CPU/GPU resources/workload I can see, that the GPU workload drops to zero often (I guess in between batches).
Then a suggestion:
I think the dataset.shuffle() could be improved, if shuffling is not done only on the _n_ ( = buffer_size) samples in memory, but somehow on the whole list of inputs. For example: I'm storing path to images and labels in a text file. If shuffling is not done already in the text file you can often have thousands of lines after each other of the same class. If I now only work with dataset.shuffle() it can easily happen (depending of the buffer_size) that all elements that get shuffled are anyhow of the same class. The only. Maybe some toy example (ignoring labels and only working with image paths) to make my point clearer. For reasons of readability I work with a very small buffer_size and list of file names. But I guess everyone can imagine the same just with thousands of filenames in the list and a buffer_size e.g. of 5000.
import tensorflow as tf
image_paths = tf.constant(['train/class1/img1.png',
'train/class1/img2.png',
'train/class1/img3.png',
'train/class1/img4.png',
'train/class1/img5.png',
'train/class1/img6.png',
'train/class1/img7.png',
'train/class2/img1.png',
'train/class2/img2.png',
'train/class2/img3.png',
'train/class2/img4.png',
'train/class2/img5.png',
'train/class1/img6.png',
'train/class1/img7.png'])
dataset = tf.contrib.data.Dataset.from_tensor_slices(image_paths)
dataset = dataset.shuffle(3)
iterator = dataset.make_one_shot_iterator()
next_element = iterator.get_next()
with tf.Session() as sess:
while True:
try:
path = sess.run(next_element)
print(path)
except tf.errors.OutOfRangeError:
break
This would print something like:
'train/class1/img1.png'
'train/class1/img2.png'
'train/class1/img4.png'
'train/class1/img3.png'
'train/class1/img5.png'
'train/class1/img7.png'
'train/class2/img1.png'
'train/class1/img6.png'
'train/class2/img3.png'
'train/class2/img4.png'
'train/class1/img6.png'
'train/class2/img5.png'
'train/class1/img7.png'
'train/class2/img2.png'
So since there is only a shuffling between the 3 examples in the buffer, the first samples (same for batches) will all have samples only of one class. So unless the shuffling isn't done already in the list of filenames you'll have troubles training any network. And if the available dataset of images is huge, increasing the buffer_size is often not a solution.
Another problem I see, is that like shuffling currently is implemented, there is no true shuffling of the entire dataset possible. The only workaround I found was pre-shuffling the filelist I read from the text file before creating the dataset. But once the dataset is created, it's only possible to shuffle in the range of the buffer_size.
@mrry Thanks for the a preview of the new API; I think this is a good starting point!
One function that still seems to be missing, but would be essential for one of our primary use cases (see comment above: https://github.com/tensorflow/tensorflow/issues/7951#issuecomment-283186552) is a
Dataset.map_to_multiple()
used as in
dataset2 = dataset1.map_to_multiple(func)
function, where one element of dataset1 is mapped to one or more elements for dataset2; i.e. #dataset2 >= #dataset1.
As far as I understand it, Dataset.map() preserves a 1:1 mapping, which is not sufficient for our use case.
One concrete example of why this would be useful:
Assume dataset1 is a list of large images (e.g. 8192x8192 each) [with corresponding labels]. Then, dataset2 is created by (randomly) iterating over each element d1 of dataset1, and for each of these elements the function func samples a (variable) number of sub-images [and sub-labels] (e.g. 256x256 each) from d1, taken from various regions of d1. For example, in one instance, func might return 142 new {image, label} pairs that will be added to dataset2. In another instance, it might return 389 new pairs, etc. The number of elements generated each time is variable and conditional on the properties of element d1.
@kmhofmann I think you can map one example to multiple examples with Dataset.flat_map(). Inside your flatmap function, create a new Dataset object with one or multiple examples for every input example.
@EdeMeijer That could be -- it's really hard to tell, as the documentation is quite sparse, with no examples. (There seem to be two places containing some amount of separate documentation: either on GitHub or on tensorflow.org)
One thing I notice is that the arguments num_threads and output_buffer_size from map() are missing in flat_map(); does that mean no parallel processing is possible? Or is this a TODO? Hard to scope both functionality and feature set...
I based my suggestion on the one flat_map code example on the github page you linked. There, single string tensors come in (file names) and whole Datasets are emitted in the map function, so that seems pretty clear. I guess the parallel processing from map() is a TODO, I'm sure they'd love a PR :)
Ah, thanks, I missed that, as it was in an example about text processing. Still, the function description in the documentation seems a bit sparse, consisting of Maps map_func across this dataset and flattens the result.
input_fn has to return features and labels only. What about extra params which can be used while training progress to customise loss for given input?
Another feature request: it'd be great if there was an iterator.peek() operator, which would return the current iterator value (like iterator.get_next()), but not advance the iterator. This would make it easier to coordinate multiple elements of a model that all want to read from the iterator before advancing it one step.
Hi, firstly thanks for this API, Im very keen on using it. Primarily I am interested in using it to switch between training and validation datasets in the same process.
However I'm confused how one does that in this new paradigm. For instance I see no way to "get an iterator" in the middle of a dataset. As an example here is a piece of code that demonstrates what I'd like to do. Every few steps in an epoch, I'd like to run a validation op, but the output of this code shows that the iterator never advances ahead of item 0 in either dataset. How does one do that?
training_dataset = tf.contrib.data.Dataset.range(100)
validation_dataset = tf.contrib.data.Dataset.range(900,950)
iterator = tf.contrib.data.Iterator.from_structure(training_dataset.output_types,
training_dataset.output_shapes)
next_element = iterator.get_next()
training_init_op = iterator.make_initializer(training_dataset)
validation_init_op = iterator.make_initializer(validation_dataset)
# Run 3 epochs with 10 steps each
for e in range(3):
print("Epoch: %d" %e)
for step in range(11):
sess.run(training_init_op)
ne = sess.run(next_element)
print("Step: %d Training set: next_element: %d " %(step, ne))
# Run validation every 5 steps only
if step % 5 == 0:
sess.run(validation_init_op)
ne = sess.run(next_element)
print("Step: %d Validation set: next_element: %d " %(step, ne))
In the above, since we run an init each time to get an iterator pointing to its required dataset, we end up running a training and validation on the first item of each dataset, always. How does one modify this to get the updated position of the iterator in each dataset?
@jasonkriss , I see - thanks! This is what I have now, and it handles what I wanted. Is this what you meant?
training_dataset = tf.contrib.data.Dataset.range(100)
validation_dataset = tf.contrib.data.Dataset.range(900,950)
t_iterator = tf.contrib.data.Iterator.from_structure(training_dataset.output_types,
training_dataset.output_shapes)
v_iterator = tf.contrib.data.Iterator.from_structure(validation_dataset.output_types,
validation_dataset.output_shapes)
is_validating = tf.placeholder(dtype=bool,shape=())
next_element = tf.cond(is_validating, lambda:v_iterator.get_next(), lambda:t_iterator.get_next())
training_init_op = t_iterator.make_initializer(training_dataset)
validation_init_op = v_iterator.make_initializer(validation_dataset)
sess.run([training_init_op, validation_init_op])
# Run 3 epochs with 10 steps each
for e in range(3):
print("Epoch: %d" %e)
for step in range(11):
ne = sess.run(next_element, feed_dict={is_validating: False})
print("Step: %d Training set: next_element: %d " %(step, ne))
if step % 5 == 0:
ne = sess.run(next_element, feed_dict={is_validating: True})
print("Step: %d Validation set: next_element: %d " %(step, ne))
@nirmalthacker Yep, that's essentially what I meant. Although, at each validation step, I will reinit the validation iterator and run through the full validation dataset. But that's certainly the gist of it.
I'm trying migrating input pipeline from tf.train.string_input_producer & tf.train.shuffle_batch to Dataset APIs. The parameter "allow_smaller_final_batch" in tf.train.shuffle_batch(...) is useful when I'd like to assure all batches are evenly divisible by number of gpus. (I'm doing data parallelization on multiple gpus, and the batch_size is multiple of num_gpus). Is there any setting for Dataset APIs to drop the final smaller batch if any?
@winston-li I think you could just use .filter() for that. If you know your batch size is for example 32, then something like
dataset = dataset.filter(lambda batch: tf.shape(batch)[0] == 32)
should do the trick.
@ppwwyyxx I believe a custom pipeline could be designed using TF server with distributed runtime, i.e. IPC send/recv ops. It's included in C API so it won't be too hard to port it to other languages. A down side is that you do need package the whole TF runtime wherever you need this pipeline.
We are actually working on a out-of-band data plane for TF, but it is still a great deal of ongoing work. The design would be similar to the ExternalShuffleService in Spark, using an in-memory storage such as LevelDB or LMDB, and a reader client in TF. If performance is a primary concern then it should be tightly integrated with hardware, i.e. GPU/NVMe/RNIC, etc.
@EdeMeijer Thanks. I thought it should work, but after some experiments, I can't make it work as expected. I followed the guidelines of dataset README.md, with pseudo code like following:
def _parse_function(example_proto):
features = {"image": tf.FixedLenFeature((), tf.string, default_value=""),
"label": tf.FixedLenFeature((), tf.int32, default_value=0)}
parsed_features = tf.parse_single_example(example_proto, features)
return parsed_features["image"], parsed_features["label"]
BATCH_SIZE = 256
filenames = ["/var/data/file1.tfrecord", "/var/data/file2.tfrecord"]
dataset = tf.contrib.data.TFRecordDataset(filenames)
dataset = dataset.map(_parse_function)
dataset = dataset.batch(BATCH_SIZE)
dataset = dataset.filter(lambda imgs, lbls: tf.shape(imgs)[0] == BATCH_SIZE)
iterator = dataset.make_initializable_iterator()
next_element = iterator.get_next()
images, labels = next_element
# Training cycles for 100 epochs.
for _ in range(100):
sess.run(iterator.initializer)
while True:
try:
images_r, labels_r = sess.run([images, labels])
print(images_r.shape)
except tf.errors.OutOfRangeError:
break
After applied the filter, no data available in training cycles. I found the dataset (after batch, prior to filter) was in this form:
(<tf.Tensor 'arg0:0' shape=(?, 43200) dtype=float32>, <tf.Tensor 'arg1:0' shape=(?, 36) dtype=float32>)
Looks like the batch dimension is "?" (None?), so the predicate always fails... or I did something wrong?
@winston-li seeing "?" as shape is because at that point you're looking at the 'static' shape of the tensor, which isn't always defined (the graph doesn't know in advance how many examples there will be). However, tf.shape() evaluates the dynamic, real-time shape of a tensor, so I thought this should work.
Anyway, I tried creating a minimal example, but I'm getting internal kernel errors when my filter excludes any element, so for starters I opened https://github.com/tensorflow/tensorflow/issues/10725. However, what I did find was that, obviously, we should use Tensorflow ops instead of standard comparisons since the result of tf.shape() is a tensor and not a normal array.
My tf version's filter is broken, but if yours works could you try this instead?
dataset = dataset.filter(lambda imgs, lbls: tf.equal(tf.shape(imgs)[0], BATCH_SIZE))
@EdeMeijer Thank you very much, it works in my case :-)
For sequence data, it is common to batch sequences with same (or similar) lengths. This is usually achieved by simply sorting the dataset based on sequence length. However, this does not seem possible to achieve with the current API (if yes, please let me know how to do it).
It would be nice so see something in this flavor:
dataset = dataset.sort(lambda a, b: tf.shape(a)[0] < tf.shape(b)[0], buffer_size=10000)
which will sort items by chunk of 10000 (similar to dataset.shuffle) using the given comparison function.
I would like to ask again (since there was no reply to my comment above: Am I right, that the new input pipeline isn't implemented using Queues? I did some tests using the new input pipeline to load an preprocess images, but it seems everything is done sequentially and their is only a negligible performance improvement over using e.g. OpenCV to load and preprocess images. I was hoping the new input pipeline would be build on top of queues, since they provide major performance boosts but make it quite hard to work with (e.g. having seperate input pipelines with queues to switch between training and validation datasets). This is quite easy with the new API but it seems there is no real performance boost. Anybody observed the same or opposite?
@kratzert I, too, have experienced issues with getting GPUs to 100% usage and keeping them there. The Dataset API, it seems, is implemented less efficiently and though it is a welcome change regarding code clarity and simplicity as well as a more natural way of doing training and validation, it cannot (yet) substitute queues for high data rate usecases, such as computer vision.
@kratzert You are right. The new input pipeline do not have queues. If you aren't satisfied with the shuffling it provides, you can do it outside TF: You can load all the filenames into memory then shuffle them in any way you like.
@snnn Yes I know and I do exactly this. But by doing this I can't find a way to shuffle the entire data in e.g. my training data every epoch. I can shuffle e.g. the list of filenames before creating a dataset, but once I start a session to my knowledge I can only shuffle the data from the dataset that I have in memory using dataset.shuffle(buffer_size). But with images this can be hardly done for the entire dataset in memory. And I can't shuffle the filenames again and create a new dataset from them, once inside the session or am I wrong?
@vvekic Thanks for your reply, so I know it's not only me having performance issues. Of course the code clarity and simplicity of working with the new dataset class is a huge step forward and very welcome. But it seems that for training computer vision networks queues are still the way to go (unfortunately) as the performance boost is immense.
@kratzert Using queues increases your performance by overlapping data loading latency, and this is independent of what you use to load your data. You can always insert queues or StagingArea in the input pipeline, regardless of whether the actual data loading is done by dataset API, the old input operators, or Python.
@byronyi What I meant is to receive tensors from non-TF processes. Because as @PatWie pointed out above, data processing doesn't really need to happen in the graph.
@ppwwyyxx do you have any example code for combining queues and the new dataset api? Sounds awesome and I will definitely try this out later.
@snnn Well I'll look into it, but since my knowledge in C++ isn't so profound we'll see how successfully I'll be. Anyway, I think this could be a feature that more people than I might be interested in and should/could possibly be integrated into master.
@kratzert https://www.tensorflow.org/performance/performance_models and the associated code shows how to use StagingArea.
@kratzert it is certainly possible to re-shuffle the filenames for every epoch, I'm doing exactly that for my own training. You can use an initializable iterator together with a placeholder to achieve this. I'm doing something like this:
filenames_op = tf.placeholder(tf.string, shape=[None])
dataset = tf.contrib.data.TFRecordDataset(filenames)
iterator = dataset.make_initializable_iterator()
next_elem = iterator.get_next()
filenames = ['file1', 'file2', 'file3']
# For every epoch, shuffle the file names and initialize the iterator
random.shuffle(filenames)
sess.run(iterator.initializer, {filenames_op: filenames})
@EdeMeijer That's smart. Thank you very much. I should have come to this on my own! Here is a complete working code snippet for anybody interested:
import tensorflow as tf
import numpy as np
num_epochs = 2
image_paths = ['train/class1/img1.png',
'train/class1/img2.png',
'train/class1/img3.png',
'train/class1/img4.png',
'train/class1/img5.png',
'train/class1/img6.png',
'train/class1/img7.png',
'train/class2/img1.png',
'train/class2/img2.png',
'train/class2/img3.png',
'train/class2/img4.png',
'train/class2/img5.png',
'train/class1/img6.png',
'train/class1/img7.png']
num_samples = len(image_paths)
filenames_op = tf.placeholder(tf.string, shape=[None])
dataset = tf.contrib.data.Dataset.from_tensor_slices(filenames_op)
iterator = dataset.make_initializable_iterator()
next_element = iterator.get_next()
with tf.Session() as sess:
for epoch in range(num_epochs):
print("Starting epoch %i" % (epoch))
np.random.shuffle(image_paths)
sess.run(iterator.initializer, {filenames_op: image_paths})
for i in range(num_samples):
path = sess.run(next_element)
print(path)
This shuffles as desired on every epoch the entire dataset and gives an output e.g. like this:
Starting epoch 0
'train/class2/img4.png'
'train/class2/img1.png'
'train/class1/img2.png'
'train/class1/img7.png'
'train/class1/img1.png'
'train/class1/img6.png'
'train/class1/img6.png'
'train/class1/img5.png'
'train/class1/img4.png'
'train/class1/img7.png'
'train/class1/img3.png'
'train/class2/img2.png'
'train/class2/img3.png'
'train/class2/img5.png'
Starting epoch 1
'train/class1/img7.png'
'train/class1/img4.png'
'train/class2/img1.png'
'train/class1/img6.png'
'train/class1/img2.png'
'train/class1/img3.png'
'train/class1/img6.png'
'train/class2/img5.png'
'train/class2/img2.png'
'train/class1/img1.png'
'train/class2/img3.png'
'train/class2/img4.png'
'train/class1/img5.png'
'train/class1/img7.png'
Currently the tutorial says that we can use
dataset = dataset.repeat()
dataset = dataset.shuffle(buffer_size=10000)
to get shuffled data. The pattern is also used in tf.contrib.data.read_batch_features
However calling repeat before shuffle could lead to the shuffle across multiple epochs.
For example, the following code
import tensorflow as tf
data = tf.contrib.data
dataset = data.Dataset.from_tensor_slices(['file_0', 'file_1'])
repeat_count = 5
shuffle_buffer_size = 100
repeat_then_shuffle = dataset.repeat(count=repeat_count)
repeat_then_shuffle = repeat_then_shuffle.shuffle(
buffer_size=shuffle_buffer_size)
repeat_then_shuffle_iter = repeat_then_shuffle.make_one_shot_iterator()
get_next = repeat_then_shuffle_iter.get_next()
with tf.Session() as sess:
result = []
try:
while True:
result.append(sess.run(get_next))
except tf.errors.OutOfRangeError:
pass
print(result) # [b'file_0', b'file_0', b'file_0', b'file_1', b'file_0', b'file_1', b'file_0', b'file_1', b'file_1', b'file_1']
gets 3 file_0 before getting a file_1.
Are there any concerns about calling shuffle before repeat?
Just want to add something here, I implemented a multiprocess-based data feeding pipeline for multi-task learning. It can achieve avg. GPU utilization >90% and quad-core CPU utilization >95%. Less prone to memory leak and particularly good for days-long training. Not saying it's perfect, but at least works much better than current TF queue API in my case. If anyone interested: https://hanxiao.github.io/2017/07/07/Get-10x-Speedup-in-Tensorflow-Multi-Task-Learning-using-Python-Multiprocessing/
That was already done in TensorPack for a while now by @ppwwyyxx. There you also get further speedup using ZMQ -- plus it has a nice interface using Python generators. For me, the way tensorpack handles input data, is the most elegant way. I hope to see something like this in a future TF.
@PatWie thanks for pointing this out! I just quickly checked @ppwwyyxx repo really awesome! Thanks again
It would be great to have GPU resident queues.
It would be great to have GPU resident queues.
@xieqihuiPG See StagingArea and MapStagingArea
Would greatly appreciate:
Dataset.sample_random()Dataset.update(), Dataset.pop() etc., e.g. for creating streaming buffers, replay memory objects...Dataset.describe()What about supporting custom ops to create a Dataset? For example, let's say I have a Python function which returns a new batch on each call (a generator). I want to wrap this function using tf.py_func and use it to build a Dataset. This doesn't seem to be supported?
I currently use this method with tf.train.*batch* ops and it works nicely but I'd like to find a way to do this for evaluation as well (and figured maybe Dataset is a good way to do this with the "reintializable" iterator).
@mrry This is great work and definitely very useful for creating nice learning APIs on top of TensorFlow. However, I have a couple main concerns:
train/fit method and a infer/predict method. Let's call the type of the (potentially) nested structure of inputs to our model I and the type of training inputs, which are only needed when training (e.g., supervision labels), TI. In this case, we want the train method to accept datasets with elements of type (I, TI) (i.e., a tuple of I and TI) and the predict method to accept datasets with elements of type I or (I, TI) (in which case it would ignore the labels). We also want the model to only have one underlying graph, supporting all these types of input. The way I could see doing that was for the underlying model to construct two iterators (one with elements type I and one with type TI) and initialize them according to the provided datasets. However, if somebody provides a dataset with elements of type (I, TI) to the train method, there is no way to unzip this dataset and initialize both iterators. One has to use Dataset.map twice, which is not efficient (I think but please correct me if I'm wrong) and which may also not pull matching elements from the datasets (if each pull advances the current index in the original first dataset -- I'm not sure if that happens).TensorDataset per batch and re-initialize an existing iterator.Also, if my description is terribly unclear, please let me know and I'll try to clarify.
The new input pipelines are great! But unfortunately, we are unable to use them for large-scale training because our data preprocessing is quite costly and needs to be distributed across multiple machines--or we just haven't figured out the right way to do it.
We have thus been using the old FIFOQueue interface in the following manner (pseudocode):
# Set up queues
kwargs = {'capacity': ..., 'dtypes': ..., 'names': ..., 'shapes': ...}
train_queue = tf.FIFOQueue(**kwargs)
valid_queue = tf.FIFOQueue(**kwargs)
queue_index = tf.Variable(0, trainable=False)
queue = tf.QueueBase.from_list(queue_index, [train_queue, valid_queue])
batch_size = ...
batch = queue.dequeue_many(batch_size)
# Build model
output = build_model(batch['X'])
loss = evaluate_loss(output, batch['y'])
# Fill queues
train_data_stream = some_iterable_for_training_data()
validation_data_stream = some_iterable_for_training_data()
start_filling_queues_in_background_thread(train_queue, train_data_stream)
start_filling_queues_in_background_thread(validation_queue, validation_data_stream)
Having two different queues with from_list allows us to switch between the training and validation queue by either setting the queue_index or feeding it in the feed_dict.
The some_interable_for_xxx_data are usually generators that get data from a bunch of workers sitting behind a load balancer (e.g. using ZeroMQ, RabbitMQ, or PubSub). This approach works well (because the queues provide a buffer) but we don't have any way of telling when the iterator is exhausted. Some workarounds are
tf.errors.OutOfRangeError is raised when the queue is exhausted (but then we can't reopen it again #4535)session.run of the training op and assuming that a timeout is due to the queue being exhausted (but the network connection might be down or our workers might be too slow)exhausted field to the queue names and letting the background thread enqueue an item with exhausted=True together with an assertion around the dequeue operation (but using dequeue_many will dequeue elements from the next epoch if the number of items per epoch is not an integer multiple of the batch size, see also #2514)None of these are satisfactory and it would be great to see either the ability to construct Datasets from python iterator with a queue for buffering or fix #4535 (which will automatically fix #2514).
Looking forward to hear whether we've just not been using the datasets API right.
I think the queues are nice enough. I'd like to see two things improved though:
An easier way of inputting data from native python other than using placeholders, and managing threads.
Maybe a class InputQueue(delegate, fn, n_filler_threads) that takes a tensorflow queue delegate and a python function fn. fn returns a (possibly nested) tuple of np.array or lists. The InputQueue starts n_filler_threads that calls fn and puts these on the delegate. The threads are daemons so shuts down when the main process does.
Anyway, that's just my thoughts. It's probably a lot harder than this due to the static requirements of tensorflow. Maybe you just have to provide the sizes when you create the delegate.
I am using the new api Dataset now. But still find the problem that how to dynamically feed data to the Dataset. There are two similar questions in here and here@albertz.
As you can see, the real-world problems are more than just feeding into a series of images or texts. So I would really appreciate if you could let me to feed the data freely in terms of when and how.
I can image two options. One is efficient distributed reading through feed_dict. Although it is slow, but with multi-processing, it is just a matter of machine. The other one is to _wrap_ some mature and widely accepted implementation.
use placeholder as input to a queue, and the model reads inputs from the queue, then use a session run thread to feed inputs(maybe produced by hadoop mapreduce) to the queue. use staging area you can even hide all preprocessing and input time.
I'm trying to test the example in the doc
But seems that this call is passing only 1 argument to the function:
dataset = dataset.map(_parse_function)
Instead the function is defined with two parameter
@eaplatanios one relevant PR for zip/unzip is https://github.com/tensorflow/tensorflow/issues/10837
@mrry Have you tested this section of the documentation with python3?
@bhack I haven't been able to make it work with more than one parameter in return from the function given to py_func. I'm using python3 and didn't tried with python2.
Is your problem similar ?
@AMairesse The first problem was solved with https://github.com/tensorflow/tensorflow/commit/2139e7d8b10764f2245f34548f6fbfc25d29bff8
@bhack Thanks, will try that soon, I was using a workaround which I'm not proud of :-)
The fix in the documentation is one month old and prior to v1.3 release, the tensorflow.org website is not updated when there is a new release ? Official doc does not have the fix
@AMairesse I suggest you to notify this in https://github.com/tensorflow/tensorflow/issues/11786
need more operator for image process, like map_coordinates, so We can build image augmentation pipe line only use tensorflow
And Dataset do not stably init variable defined in the map function as https://github.com/tensorflow/tensorflow/issues/12648
I'd like to re-raise an earlier performance-related question by @kratzert that seems to have fallen out of focus. The performance gain of using the new Dataset API is negligible.
@ppwwyyxx stated that queues and StagingArea can still be used with the Dataset API, but I still haven't seen a working example of this. Do we have one?
What purpose does the new API serve if one must still include queues, data_flow_ops or StagingArea complexities?
@vvekic, I experimented a bit with queues and the Dataset API after realising in horror that of the 0.8s/step in my inference loop, 0.2s is data fetching (with GPU at 0% utilization), raising to almost 2 seconds if the HDD is being used by something else at the same time.
My pipeline looks as follows:
def preprocess_image(fn):
im_s = tf.read_file(fn)
im = tf.image.decode_jpeg(im_s, channels=3)
im = inception_preprocessing.preprocess_for_eval(im, width=299, height=299)
return fn, im
dataset = tf.contrib.data.Dataset.list_files('{}/*/*.jpg'.format(FLAGS.dataset_dir))
dataset.map(preprocess_image, num_threads=FLAGS.num_threads)
iterator = dataset.make_one_shot_iterator()
input_queue = tf.FIFOQueue(capacity=100*FLAGS.batch_size,
dtypes = iterator.output_types,
shapes=iterator.output_shapes)
enqueue_sample = input_queue.enqueue(iterator.get_next())
tf.train.add_queue_runner(tf.train.QueueRunner(input_queue, [enqueue_sample]*FLAGS.num_threads))
filenames, images = input_queue.dequeue_up_to(FLAGS.batch_size)
I still have to run this on a big dataset and check if there's any performance improvement, but at least it seems to execute correctly. The catch is, I couldn't find a way to iterate over the data more than once (which luckily enough is not my use-case), because the only iterator that won't raise an error when the QueueRunners spawn the threads is the one_shot_iterator.
I don't know if I'm right here, but I have a question about the dataset API. My dataset contains one column with sequences and one with sequence length which i want treat different, because i want to pad the sequences. Is it possible to address a single column in the dataset so that it is treated different from the other column? E.g.:
two_column_dataset = ... # This contains the column sequence and sequence length
first_column_dataset = two_column_dataset[0].padded_batch(64, ...) # Pad only first column
second_column_dataset = two_column_dataset[1].batch(64) # Get corresponding sequence length for sequences
two_column_dataset = Dataset.zip((first_column_dataset, second_column_dataset))
Edit: After writing this, i found it out:
def flat_map_func(sequence, sequence_length):
first_column_dataset = Dataset.from_tensors(sequence).padded_batch(64, ...)
second_column_dataset = Dataset.from_tensors(sequence_length).padded_batch(64)
zipped_dataset = Dataset.zip((first_column_dataset, second_column_dataset))
return zipped_dataset
two_column_dataset = two_column_dataset.flat_map(flat_map_func)
This issue thread is becoming a bit unwieldy and it's getting hard to keep track of the individual discussions, so I'm going to lock it after responding to a few of the recent comments. Please feel free to open a new issue about any specific topics of feature requests related to tf.contrib.data and we can continue the discussion there.
In response to a few recent questions:
@GPhilo (link) and @kratzert (link): The Dataset API includes methods for prefetching, so it shouldn't be necessary to add a queue here, and you can retain the other advantages of Datasets (like reinitialization etc.). Passing output_buffer_size=100 * FLAGS.batch_size to the dataset.map() call, and following that with dataset.batch(FLAGS.batch_size) will run your preprocess_image function in parallel and should decently increase the performance.
dataset = tf.contrib.data.Dataset.list_files('{}/*/*.jpg'.format(FLAGS.dataset_dir))
dataset = dataset.map(preprocess_image, num_threads=FLAGS.num_threads,
output_buffer_size=100*FLAGS.batch_size)
dataset = datsaet.batch(FLAGS.batch_size)
iterator = dataset.make_one_shot_iterator()
filenames, images = iterator.get_next()
Note that in TF 1.4 there will be a Dataset.prefetch() method that makes it easier to add prefetching at any point in the pipeline, not just after a map(). (You can try it by downloading the current nightly build.)
In reponse to @kratzert's specific question about the implementation, the Dataset and Iterator classes don't use TensorFlow's previous producer/consumer queues (such as tf.FIFOQueue or tf.RandomShuffleQueue), but they do include simpler (and more efficient) implementations of the core ideas. For example, Dataset.prefetch() will start a background thread to populate a ordered buffer that acts like a tf.FIFOQueue, so that downstream pipeline stages need not block. However, the prefetch() implementation is much simpler, because it doesn't need to support as many different concurrent operations as a tf.FIFOQueue.
@vvekic (link): I'd be curious to see your code before and after trying the Dataset API, and perhaps you could follow up by opening an issue describing the performance bottleneck. Compared to feeding or a (non-StagingArea) queue-based pipeline, the new API should be more efficient, and I'd be curious to know which parts aren't!
At present, you're correct that the StagingArea functionality is not included in the Dataset API, and for peak performance in GPU workloads you will need to add a staging area manually. However, we are actively working on implementing Datasets that can span devices (see 19a55725af8102d72d4e081c5139f0e4bd5a4bb7 for some of the work in progress) and one of the first use cases for that is to support prefetching into GPU memory.
@tengerye (link): For dynamically feeding data into a Dataset, I'd suggest you try out the Dataset.from_generator() method that we're adding to TF 1.4 (and which is available in nightly builds already). I answered @albertz's Stack Overflow question about doing this here. (Supporting distributed pipelines will depend on the cross-device Dataset support that I mentioned in the last answer, and we'll be implementing that soon.) I think this will also work for @rasmusbergpalm's request, because you can create concurrent generators, and for @tillahoffmann's request and @sirfz's request as well. This API is very new though, so if you have any feedback, please let us know!
@jasonkriss (link) We've implemented something called "feedable" iterators, which let you switch the input for single graph between multiple iterators (e.g. one for training and one for testing). The programmers' guide has more details about how to use this feature.
@guillaumekln (link) If you want to batch sequences with different lengths, you can use the Dataset.group_by_window() transformation. Have a look at how this is used in the NMT model code for an example.
Thanks again to all of you for your continued interest in this part of TensorFlow!
Most helpful comment
Personally, I'm a very big fan of the
feed_dictmethod of feeding data into the graph. It is by far the most flexible, makes debugging way easier and makes for much simpler code. Thus my biggest wish would be to make that method more performant. Right now, this method starves my GPU all the time, which is a shame because most other DL frameworks (even those based on computational graphs) manage to make this much more performantly. I assume there is more copying/handling going on in the background than would be necessary.