Ray: tf.errors.OutOfRangeError error when used with ray tune

Created on 9 Oct 2018  Â·  12Comments  Â·  Source: ray-project/ray

System information

  • OS Platform and Distribution (e.g., Linux Ubuntu 16.04): RHEL 7
  • Ray installed from (source or binary): No
  • Ray version: 0.5.2
  • Python version: 2.7
  • Exact command to reproduce: See below

Describe the problem

I'm trying to optimize my tf.dataset input pipeline using Ray tune. A tf.errors.OutOfRangeError error is thrown by tensorflow when used in Ray's environment, but works fine when ran independently. In the source code below, test_tfrecords_reader when run independently from main works fine, whereas the same code when run with ray experiment in _data_fetch_for_search fails with tf.errors.OutOfRangeError with zero records read. What am I missing here?

Source code / logs

def test_tfrecords_reader(records_to_fetch=10000, timeout_s=60 * 30):
    sess = tf.keras.backend.get_session()
    records_fetched = 0

    dataset = tfrecords_reader(DATA_FILES,
                               parallel_file_reads=48,
                               batch_size=512,
                               num_parallel_data_fetch=24,
                               buffer_size=250,
                               prefetch_buffer=250)

    next_batch = dataset.make_one_shot_iterator().get_next()
    start = time()
    while True:
        try:
            records = sess.run(next_batch)  # an array of x's and y's
            records_fetched += records[1].shape[0]
            if records_fetched >= records_to_fetch or time() - start > timeout_s:
                break
        except tf.errors.OutOfRangeError:
            print("Exhausted all data")
            break
    print("Finished fetch in {}s and fetched {} records".format(time() - start, records_fetched))


def _data_fetch_for_search(config, reporter):
    records_to_fetch = 10000
    sess = tf.keras.backend.get_session()
    records_fetched = 0

    dataset = tfrecords_reader(DATA_FILES,
                               parallel_file_reads=48,
                               batch_size=512,
                               num_parallel_data_fetch=24,
                               buffer_size=250,
                               prefetch_buffer=250)
    next_batch = dataset.make_one_shot_iterator().get_next()
    start = time()
    while True:
        try:
            records = sess.run(next_batch)  # an array of x's and y's
            records_fetched += 512  # records[1].shape[0]
            reporter(samples_freq=records_fetched / duration, duration=duration, samples_fetched=records_fetched)
            if records_fetched >= records_to_fetch:
                break
        except tf.errors.OutOfRangeError:
            print("Exhausted all data after fetching {} records".format(records_fetched))
            break

    duration = time() - start
    reporter(samples_freq=records_fetched / duration, duration=duration, samples_fetched=records_fetched)


def ray_search(total_trails=80, experiment_name="nqs_input_pipeline"):
    num_parallel_trails = 1  # number of GPUs
    max_time_for_trial_s = 60 * 60  # 1 hour
    cores = multiprocessing.cpu_count()
    ray_results_save_dir = "./ray_search_results"

    ahb = AsyncHyperBandScheduler(
        time_attr="time_total_s",
        reward_attr="duration",
        grace_period=1,
        max_t=max_time_for_trial_s)

    space = {
        "batch_size": hp.uniform("batch_size", 128, 1024),  # integer only
        "num_parallel_data_fetch": hp.uniform("num_parallel_data_fetch", cores / 2, cores * 2),  # integer only
        "buffer": hp.uniform("buffer", 250, 2048),  # integer only
        "parallel_file_reads": hp.uniform("parallel_file_reads", cores / 2, cores * 2),  # integer only
        "prefetch_buffer_size": hp.uniform("prefetch_buffer_size", 250, 2048),
    }

    experiment_spec = {
        experiment_name: {
            "run": _data_fetch_for_search,
            "stop": {
                "time_total_s": max_time_for_trial_s
            },
            "num_samples": total_trails,
            "local_dir": ray_results_save_dir,
            "max_failures": 2
        }
    }

    algo = HyperOptSearch(space, max_concurrent=num_parallel_trails, reward_attr="duration")

    ray.init(redirect_output=True)
    run_experiments(experiment_spec, scheduler=ahb, search_alg=algo, verbose=False)

All 12 comments

Hm, not sure from the initial looks of this.

Can you try turning test_tfrecords_reader into a remote function and then executing 2 copies of that in parallel?

Thanks @richardliaw for helping me. It works fine with the below code,

@ray.remote
def test_tfrecords_reader(config, reporter):
    records_to_fetch = config.get("records_to_fetch", 500)
    timeout_s = 60 * 30
    sess = tf.keras.backend.get_session()
    records_fetched = 0

    dataset = tfrecords_reader(DATA_FILES,
                               parallel_file_reads=48,
                               batch_size=512,
                               num_parallel_data_fetch=24,
                               buffer_size=250,
                               prefetch_buffer=250)

    next_batch = dataset.make_one_shot_iterator().get_next()
    start = time()
    while True:
        # try:
        records = sess.run(next_batch)  # an array of x's and y's
        records_fetched += records[1].shape[0]
        if records_fetched >= records_to_fetch or time() - start > timeout_s:
            break
        # except tf.errors.OutOfRangeError:
        #     print("Exhausted all data")
        #     break
    print("Finished fetch in {}s and fetched {} records".format(time() - start, records_fetched))
    reporter(duration=10)
    return records_to_fetch

if __name__ == '__main__':
    start = time()
    ray.init()
    t1 = test_tfrecords_reader.remote({"records_to_fetch": 1000}, lambda **kwargs: None)
    t2 = test_tfrecords_reader.remote({"records_to_fetch": 6000}, lambda **kwargs: None)
    assert (ray.get(t1) >= 1000)
    assert (ray.get(t2) >= 6000)
    print("Took {}s to finish".format(time() - start))

OK thanks. Can you post the full output of the original failure?

Its kind of huge and didn't want to strip anything out here. Please check the entire trace log file at https://drive.google.com/file/d/1DZsd7P8tIq3T2FZ76bbk56JM4L-HegDu/view?usp=sharing

The error is the same with PBT as well. Checked with both tensorflow versions 1.10 and 1.11 to eliminate any bugs due to tf.

Created a toy example with an attempt to reproduce the error at https://colab.research.google.com/drive/1Kl172GB0q-h7RIuiIhcUyV3wl6iFP6Fr

Here are my observations and I'm very confused now,

  1. It works fine in Google Colab environment
  2. Throws the same error on my Mac or Linux in a brand new virtual env as well
  3. Doesn't throw an error when run independently on Mac and Linux

The error is the same even in a ubuntu docker container having only required libraries. Not sure whats the pattern here.
Can someone please help?

Sorry for the late reply. This is so odd; it works on the colab environment for me too. This is a nice notebook BTW!

Do you have a version of the notebook that I can run on a local machine?

Thanks @richardliaw . I found the issue. The current folder during experiments is set to folder in which the trial results are saved and not the directory in which the python process started. Due to this tensorflow couldn't find the input files as I used relative paths in the tf.dataset API. In Google Colab, I used the same folder for input data and also for ray results dir, by coincidence, as it was a temp folder anyway. I think it might be helpful to note this in the documentation if not already or if possible not change cwd during the trials.

If you think it would be a good example for others, feel free to add it some where in your documentation or repo :)

And here is the code that can run as a standalone python script, in an environment with numpy, pandas, ray, tensorflow, hyperopt installed.

# -*- coding: utf-8 -*-
"""ray-tune-with-tfrecords

Automatically generated by Colaboratory.

Original file is located at
    https://colab.research.google.com/drive/1Kl172GB0q-h7RIuiIhcUyV3wl6iFP6Fr

##### Copyright 2018
"""

# MIT License

# Copyright (c) 2018 Nitin Pasumarthy

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

"""# Use Ray Tune with tf.dataset API"""

# !pip install --upgrade -q git+git://github.com/hyperopt/hyperopt.git
#
# !pip install -q ray

import json
import multiprocessing
import os
import traceback

import numpy as np
import pandas as pd
import ray
import tensorflow as tf
from hyperopt import hp
from ray.tune import run_experiments
from ray.tune.schedulers import AsyncHyperBandScheduler
from ray.tune.suggest import HyperOptSearch
from tensorflow import keras
from time import time

"""## Prepare Data

Since we want to test reading TFRecords file, lets convert an existing keras dataset into that format
"""

(train_data, train_labels), (test_data, test_labels) = keras.datasets.boston_housing.load_data()

"""Create the train dataset"""

X_train = pd.DataFrame(train_data,
                       columns=['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO', 'B',
                                'LSTAT'])

X_train.head()

y_train = pd.DataFrame(train_labels, columns=["Price"])

y_train.head()

train_df = pd.concat([X_train, y_train], axis=1)

train_df.head()

"""Repeat the same for test set. Here the word test is used interchangeably for validation set"""

X_test = pd.DataFrame(test_data,
                      columns=['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO', 'B',
                               'LSTAT'])
y_test = pd.DataFrame(test_labels, columns=["Price"])
test_df = pd.concat([X_test, y_test], axis=1)
test_df.head()

"""Define helpers for tensor features"""


def _int64_feature(value):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))


def _float_feature(value):
    return tf.train.Feature(float_list=tf.train.FloatList(value=value))


"""Authenticate Colab to access your Google Drive - https://colab.research.google.com/notebooks/io.ipynb#scrollTo=u22w3BFiOveA. Need to re-authenticate every few hours"""

# from google.colab import drive
# drive.mount('/content/gdrive')

"""Create a folder called "tmp" in drive.google.com, and create all files there, so that its easy to delete the folder later"""

data_folder = os.path.abspath('./data/') # IMPORTANT: choose absolute paths for all input files to be used in ray's environment

if not os.path.exists(data_folder):
    os.makedirs(data_folder)

"""A helper function which saves a numpy ndarray as a TFRecords file"""


def save_pd_df_to_tfrecords(dataframe, outfilename):
    with tf.python_io.TFRecordWriter(os.path.join(data_folder, outfilename)) as writer:
        for row in dataframe.values:
            example = tf.train.Example(
                features=tf.train.Features(feature={
                    'x': _float_feature(row[:-1]),
                    'y': _float_feature([row[-1]])
                })
            )
            writer.write(example.SerializeToString())


"""Let us save the TFRecords file to your drive. It's only 43KB"""

save_pd_df_to_tfrecords(train_df, "train.tfrecords")
save_pd_df_to_tfrecords(test_df, "test.tfrecords")

"""## Input pipeline"""


def tfrecords_reader(data_files, parallel_file_reads, batch_size, num_parallel_data_fetch, buffer_size, prefetch_buffer,
                     is_training=False):
    def decode(serialized_example):
        features = tf.parse_single_example(serialized_example, features=feature_tensors)
        label = features.pop("y")
        return features, label

    feature_tensors = {
        "x": tf.FixedLenFeature([13], tf.float32),
        "y": tf.FixedLenFeature([1], tf.float32)
    }
    dataset = tf.data.TFRecordDataset(data_files, num_parallel_reads=parallel_file_reads)

    if is_training:
        dataset = dataset.apply(tf.contrib.data.shuffle_and_repeat(buffer_size, -1, 42))

    dataset = dataset.apply(tf.contrib.data.map_and_batch(decode, batch_size,
                                                          num_parallel_batches=num_parallel_data_fetch,
                                                          drop_remainder=False)
                            )
    dataset = dataset.prefetch(prefetch_buffer)

    return dataset


"""## Ray Tune

Let us use ray tune to optimize the above input pipeline
"""

try:
    ray.init(redirect_output=True)
except Exception:
    print("## Exception caught ##")
    traceback.print_exc()


def _parse_ray_results(results_folder, output_filename=None):
    """
    Parses & aggregates ray's trails results as a pandas dataframe
    :param results_folder: path to the folder where the results of the trails are saved
    :param output_filename: saves the dataframe as a CSV at this location if provided
    :return aggregated dataframe of all trail's results
    """
    results_files = tf.gfile.Glob(os.path.join(results_folder, "*", "result.json"))
    print("Identified {} ray result files".format(len(results_files)))
    trails_results = []
    for f in results_files:
        with open(f, "r") as file:
            for line in file:
                result = json.loads(line)
                config = result.pop("config")  # flatten the config for better analysis
                trails_results.append(dict(result.items() + config.items()))
    df = pd.DataFrame(trails_results)
    if output_filename:
        df.to_csv(output_filename, index=False)
        print("Saved results at {}".format(output_filename))
    return df


def _data_fetch_for_search(config, reporter):
    records_to_fetch = 500

    b = int(config["batch_size"])
    pf = int(config["num_parallel_data_fetch"])
    bu = int(config["buffer"])
    p = int(config["parallel_file_reads"])
    pb = int(config["prefetch_buffer_size"])

    data_files = config["data_files"]

    dataset = tfrecords_reader(data_files,
                               parallel_file_reads=p,
                               batch_size=b,
                               num_parallel_data_fetch=pf,
                               buffer_size=bu,
                               prefetch_buffer=pb,
                               is_training=True)

    start = time()
    next_batch = dataset.make_one_shot_iterator().get_next()
    records_fetched = 0

    with tf.Session() as sess:
        while True:
            try:
                records = sess.run(next_batch)  # an array of x's and y's
                records_fetched += b  # records[1].shape[0]

                duration = time() - start
                reporter(samples_freq=records_fetched / duration, duration=-duration, samples_fetched=-records_fetched)

                if records_fetched >= records_to_fetch:
                    break
            except tf.errors.OutOfRangeError:
                print("Exhausted all data after reading {} records".format(records_fetched))
                break

    duration = time() - start
    reporter(samples_freq=records_fetched / duration, duration=duration, samples_fetched=records_fetched)


def ray_search(data_files, total_trials=80, experiment_name="input_pipeline"):
    num_parallel_trails = 4  # can be number of GPUs available
    max_time_for_trial_s = 60 * 60  # 1 hour
    cores = multiprocessing.cpu_count()
    ray_results_save_dir = os.path.join(data_folder, "ray_search_results")

    ahb = AsyncHyperBandScheduler(
        time_attr="time_total_s",
        reward_attr="duration",
        grace_period=1,
        max_t=max_time_for_trial_s)

    # https://github.com/hyperopt/hyperopt/wiki/FMin#21-parameter-expressions
    space = {
        "batch_size": hp.uniform("batch_size", 128, 1024),
        "num_parallel_data_fetch": hp.uniform("num_parallel_data_fetch", cores / 2, cores * 2),
        "buffer": hp.uniform("buffer", 250, 2048),
        "parallel_file_reads": hp.uniform("parallel_file_reads", cores / 2, cores * 2),
        "prefetch_buffer_size": hp.uniform("prefetch_buffer_size", 250, 2048),
    }

    experiment_spec = {
        experiment_name: {
            "run": _data_fetch_for_search,
            "stop": {
                # "training_iteration": 100,
                "time_total_s": max_time_for_trial_s
            },
            "trial_resources": {
                "cpu": max(1, cores / num_parallel_trails),
                "gpu": 0
            },
            "config": {
                "data_files": data_files,
                "batch_size": np.random.randint(128, 1024),
                "num_parallel_data_fetch": np.random.randint(cores / 2, cores * 2),
                "buffer": np.random.randint(250, 2048),
                "parallel_file_reads": np.random.randint(cores / 2, cores * 2),
                "prefetch_buffer_size": np.random.randint(250, 2048),
            },
            "num_samples": total_trials,
            "local_dir": ray_results_save_dir,
            "max_failures": 2
        }
    }

    algo = HyperOptSearch(space, max_concurrent=num_parallel_trails, reward_attr="samples_freq")
    run_experiments(experiment_spec, scheduler=ahb, search_alg=algo, verbose=False)  # with HyperOpt space
    #   run_experiments(experiment_spec, scheduler=ahb, verbose=False) # with Random space

    _parse_ray_results(os.path.join(ray_results_save_dir, experiment_name),
                       os.path.join(ray_results_save_dir, experiment_name, "agg_results.csv"))


def main(run_with_ray=True):
    if run_with_ray:
        ray_search([os.path.join(data_folder, "train.tfrecords"), os.path.join(data_folder, "test.tfrecords")],
                   total_trials=1)
    else:

        """Run independently without Ray"""

        def reporter_mock(**kwargs):
            print("call to reporter: {}\n".format(kwargs))

        data_files = [os.path.join(data_folder, "train.tfrecords"), os.path.join(data_folder, "test.tfrecords")]
        cores = multiprocessing.cpu_count()

        _data_fetch_for_search({
            "data_files": data_files,
            "batch_size": np.random.randint(128, 1024),
            "num_parallel_data_fetch": np.random.randint(cores / 2, cores * 2),
            "buffer": np.random.randint(250, 2048),
            "parallel_file_reads": np.random.randint(cores / 2, cores * 2),
            "prefetch_buffer_size": np.random.randint(250, 2048),
        }, reporter_mock)


if __name__ == '__main__':
    main()

Awesome! Glad it worked out.

BTW, you can do ray.init(ignore_reinit_error=True) to achieve the same
effect as your try-catch.

On Fri, Oct 12, 2018 at 11:45 AM Nitin Pasumarthy notifications@github.com
wrote:

And here is the code that can run as a standalone python script, in an
environment with numpy, pandas, ray, tensorflow, hyperopt installed.

-- coding: utf-8 --"""ray-tune-with-tfrecordsAutomatically generated by Colaboratory.Original file is located at https://colab.research.google.com/drive/1Kl172GB0q-h7RIuiIhcUyV3wl6iFP6Fr##### Copyright 2018"""

MIT License

Copyright (c) 2018 Nitin Pasumarthy

Permission is hereby granted, free of charge, to any person obtaining a copy# of this software and associated documentation files (the "Software"), to deal# in the Software without restriction, including without limitation the rights# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell# copies of the Software, and to permit persons to whom the Software is# furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all# copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE# SOFTWARE.

"""# Use Ray Tune with tf.dataset API"""

!pip install --upgrade -q git+git://github.com/hyperopt/hyperopt.git## !pip install -q ray

import jsonimport multiprocessingimport osimport traceback
import numpy as npimport pandas as pdimport rayimport tensorflow as tffrom hyperopt import hpfrom ray.tune import run_experimentsfrom ray.tune.schedulers import AsyncHyperBandSchedulerfrom ray.tune.suggest import HyperOptSearchfrom tensorflow import kerasfrom time import time
"""## Prepare DataSince we want to test reading TFRecords file, lets convert an existing keras dataset into that format"""

(train_data, train_labels), (test_data, test_labels) = keras.datasets.boston_housing.load_data()
"""Create the train dataset"""

X_train = pd.DataFrame(train_data,
columns=['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO', 'B',
'LSTAT'])

X_train.head()

y_train = pd.DataFrame(train_labels, columns=["Price"])

y_train.head()

train_df = pd.concat([X_train, y_train], axis=1)

train_df.head()
"""Repeat the same for test set. Here the word test is used interchangeably for validation set"""

X_test = pd.DataFrame(test_data,
columns=['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO', 'B',
'LSTAT'])
y_test = pd.DataFrame(test_labels, columns=["Price"])
test_df = pd.concat([X_test, y_test], axis=1)
test_df.head()
"""Define helpers for tensor features"""

def _int64_feature(value):
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

def _float_feature(value):
return tf.train.Feature(float_list=tf.train.FloatList(value=value))

"""Authenticate Colab to access your Google Drive - https://colab.research.google.com/notebooks/io.ipynb#scrollTo=u22w3BFiOveA. Need to re-authenticate every few hours"""

from google.colab import drive# drive.mount('/content/gdrive')

"""Create a folder called "tmp" in drive.google.com, and create all files there, so that its easy to delete the folder later"""

data_folder = os.path.abspath('./data/') # IMPORTANT: choose absolute paths for all input files to be used in ray's environment
if not os.path.exists(data_folder):
os.makedirs(data_folder)
"""A helper function which saves a numpy ndarray as a TFRecords file"""

def save_pd_df_to_tfrecords(dataframe, outfilename):
with tf.python_io.TFRecordWriter(os.path.join(data_folder, outfilename)) as writer:
for row in dataframe.values:
example = tf.train.Example(
features=tf.train.Features(feature={
'x': _float_feature(row[:-1]),
'y': _float_feature([row[-1]])
})
)
writer.write(example.SerializeToString())

"""Let us save the TFRecords file to your drive. It's only 43KB"""

save_pd_df_to_tfrecords(train_df, "train.tfrecords")
save_pd_df_to_tfrecords(test_df, "test.tfrecords")
"""## Input pipeline"""

def tfrecords_reader(data_files, parallel_file_reads, batch_size, num_parallel_data_fetch, buffer_size, prefetch_buffer,
is_training=False):
def decode(serialized_example):
features = tf.parse_single_example(serialized_example, features=feature_tensors)
label = features.pop("y")
return features, label

feature_tensors = {
    "x": tf.FixedLenFeature([13], tf.float32),
    "y": tf.FixedLenFeature([1], tf.float32)
}
dataset = tf.data.TFRecordDataset(data_files, num_parallel_reads=parallel_file_reads)

if is_training:
    dataset = dataset.apply(tf.contrib.data.shuffle_and_repeat(buffer_size, -1, 42))

dataset = dataset.apply(tf.contrib.data.map_and_batch(decode, batch_size,
                                                      num_parallel_batches=num_parallel_data_fetch,
                                                      drop_remainder=False)
                        )
dataset = dataset.prefetch(prefetch_buffer)

return dataset

"""## Ray TuneLet us use ray tune to optimize the above input pipeline"""
try:
ray.init(redirect_output=True)except Exception:
print("## Exception caught ##")
traceback.print_exc()

def _parse_ray_results(results_folder, output_filename=None):
""" Parses & aggregates ray's trails results as a pandas dataframe :param results_folder: path to the folder where the results of the trails are saved :param output_filename: saves the dataframe as a CSV at this location if provided :return aggregated dataframe of all trail's results """
results_files = tf.gfile.Glob(os.path.join(results_folder, "*", "result.json"))
print("Identified {} ray result files".format(len(results_files)))
trails_results = []
for f in results_files:
with open(f, "r") as file:
for line in file:
result = json.loads(line)
config = result.pop("config") # flatten the config for better analysis
trails_results.append(dict(result.items() + config.items()))
df = pd.DataFrame(trails_results)
if output_filename:
df.to_csv(output_filename, index=False)
print("Saved results at {}".format(output_filename))
return df

def _data_fetch_for_search(config, reporter):
records_to_fetch = 500

b = int(config["batch_size"])
pf = int(config["num_parallel_data_fetch"])
bu = int(config["buffer"])
p = int(config["parallel_file_reads"])
pb = int(config["prefetch_buffer_size"])

data_files = config["data_files"]

dataset = tfrecords_reader(data_files,
                           parallel_file_reads=p,
                           batch_size=b,
                           num_parallel_data_fetch=pf,
                           buffer_size=bu,
                           prefetch_buffer=pb,
                           is_training=True)

start = time()
next_batch = dataset.make_one_shot_iterator().get_next()
records_fetched = 0

with tf.Session() as sess:
    while True:
        try:
            records = sess.run(next_batch)  # an array of x's and y's
            records_fetched += b  # records[1].shape[0]

            duration = time() - start
            reporter(samples_freq=records_fetched / duration, duration=-duration, samples_fetched=-records_fetched)

            if records_fetched >= records_to_fetch:
                break
        except tf.errors.OutOfRangeError:
            print("Exhausted all data after reading {} records".format(records_fetched))
            break

duration = time() - start
reporter(samples_freq=records_fetched / duration, duration=duration, samples_fetched=records_fetched)

def ray_search(data_files, total_trials=80, experiment_name="input_pipeline"):
num_parallel_trails = 4 # can be number of GPUs available
max_time_for_trial_s = 60 * 60 # 1 hour
cores = multiprocessing.cpu_count()
ray_results_save_dir = os.path.join(data_folder, "ray_search_results")

ahb = AsyncHyperBandScheduler(
    time_attr="time_total_s",
    reward_attr="duration",
    grace_period=1,
    max_t=max_time_for_trial_s)

# https://github.com/hyperopt/hyperopt/wiki/FMin#21-parameter-expressions
space = {
    "batch_size": hp.uniform("batch_size", 128, 1024),
    "num_parallel_data_fetch": hp.uniform("num_parallel_data_fetch", cores / 2, cores * 2),
    "buffer": hp.uniform("buffer", 250, 2048),
    "parallel_file_reads": hp.uniform("parallel_file_reads", cores / 2, cores * 2),
    "prefetch_buffer_size": hp.uniform("prefetch_buffer_size", 250, 2048),
}

experiment_spec = {
    experiment_name: {
        "run": _data_fetch_for_search,
        "stop": {
            # "training_iteration": 100,
            "time_total_s": max_time_for_trial_s
        },
        "trial_resources": {
            "cpu": max(1, cores / num_parallel_trails),
            "gpu": 0
        },
        "config": {
            "data_files": data_files,
            "batch_size": np.random.randint(128, 1024),
            "num_parallel_data_fetch": np.random.randint(cores / 2, cores * 2),
            "buffer": np.random.randint(250, 2048),
            "parallel_file_reads": np.random.randint(cores / 2, cores * 2),
            "prefetch_buffer_size": np.random.randint(250, 2048),
        },
        "num_samples": total_trials,
        "local_dir": ray_results_save_dir,
        "max_failures": 2
    }
}

algo = HyperOptSearch(space, max_concurrent=num_parallel_trails, reward_attr="samples_freq")
run_experiments(experiment_spec, scheduler=ahb, search_alg=algo, verbose=False)  # with HyperOpt space
#   run_experiments(experiment_spec, scheduler=ahb, verbose=False) # with Random space

_parse_ray_results(os.path.join(ray_results_save_dir, experiment_name),
                   os.path.join(ray_results_save_dir, experiment_name, "agg_results.csv"))

def main(run_with_ray=True):
if run_with_ray:
ray_search([os.path.join(data_folder, "train.tfrecords"), os.path.join(data_folder, "test.tfrecords")],
total_trials=1)
else:

    """Run independently without Ray"""

    def reporter_mock(**kwargs):
        print("call to reporter: {}\n".format(kwargs))

    data_files = [os.path.join(data_folder, "train.tfrecords"), os.path.join(data_folder, "test.tfrecords")]
    cores = multiprocessing.cpu_count()

    _data_fetch_for_search({
        "data_files": data_files,
        "batch_size": np.random.randint(128, 1024),
        "num_parallel_data_fetch": np.random.randint(cores / 2, cores * 2),
        "buffer": np.random.randint(250, 2048),
        "parallel_file_reads": np.random.randint(cores / 2, cores * 2),
        "prefetch_buffer_size": np.random.randint(250, 2048),
    }, reporter_mock)

if __name__ == '__main__':
main()

—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/ray-project/ray/issues/3036#issuecomment-429422756,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AEUc5Wloko7nqCxwxWvUbdsSIix3INOmks5ukONegaJpZM4XOhIK
.

Oh good to know. One small question,
what is a good way to pass non-search space data to the trial function (_data_fetch_for_search above)? For example, _data_fetch_for_search above has DATA_FILES global variable which is a list of strings. Similarly I have more complex types like nested dictionaries. Want to avoid using global variables. Tried using config["data_files"] but didn't feel it clean and did not work for passing complex types. I didnt want to refactor everything to a Trainable class just for passing arguments.

Was this page helpful?
0 / 5 - 0 ratings