I'm trying to optimize my tf.dataset input pipeline using Ray tune. A tf.errors.OutOfRangeError error is thrown by tensorflow when used in Ray's environment, but works fine when ran independently. In the source code below, test_tfrecords_reader when run independently from main works fine, whereas the same code when run with ray experiment in _data_fetch_for_search fails with tf.errors.OutOfRangeError with zero records read. What am I missing here?
def test_tfrecords_reader(records_to_fetch=10000, timeout_s=60 * 30):
sess = tf.keras.backend.get_session()
records_fetched = 0
dataset = tfrecords_reader(DATA_FILES,
parallel_file_reads=48,
batch_size=512,
num_parallel_data_fetch=24,
buffer_size=250,
prefetch_buffer=250)
next_batch = dataset.make_one_shot_iterator().get_next()
start = time()
while True:
try:
records = sess.run(next_batch) # an array of x's and y's
records_fetched += records[1].shape[0]
if records_fetched >= records_to_fetch or time() - start > timeout_s:
break
except tf.errors.OutOfRangeError:
print("Exhausted all data")
break
print("Finished fetch in {}s and fetched {} records".format(time() - start, records_fetched))
def _data_fetch_for_search(config, reporter):
records_to_fetch = 10000
sess = tf.keras.backend.get_session()
records_fetched = 0
dataset = tfrecords_reader(DATA_FILES,
parallel_file_reads=48,
batch_size=512,
num_parallel_data_fetch=24,
buffer_size=250,
prefetch_buffer=250)
next_batch = dataset.make_one_shot_iterator().get_next()
start = time()
while True:
try:
records = sess.run(next_batch) # an array of x's and y's
records_fetched += 512 # records[1].shape[0]
reporter(samples_freq=records_fetched / duration, duration=duration, samples_fetched=records_fetched)
if records_fetched >= records_to_fetch:
break
except tf.errors.OutOfRangeError:
print("Exhausted all data after fetching {} records".format(records_fetched))
break
duration = time() - start
reporter(samples_freq=records_fetched / duration, duration=duration, samples_fetched=records_fetched)
def ray_search(total_trails=80, experiment_name="nqs_input_pipeline"):
num_parallel_trails = 1 # number of GPUs
max_time_for_trial_s = 60 * 60 # 1 hour
cores = multiprocessing.cpu_count()
ray_results_save_dir = "./ray_search_results"
ahb = AsyncHyperBandScheduler(
time_attr="time_total_s",
reward_attr="duration",
grace_period=1,
max_t=max_time_for_trial_s)
space = {
"batch_size": hp.uniform("batch_size", 128, 1024), # integer only
"num_parallel_data_fetch": hp.uniform("num_parallel_data_fetch", cores / 2, cores * 2), # integer only
"buffer": hp.uniform("buffer", 250, 2048), # integer only
"parallel_file_reads": hp.uniform("parallel_file_reads", cores / 2, cores * 2), # integer only
"prefetch_buffer_size": hp.uniform("prefetch_buffer_size", 250, 2048),
}
experiment_spec = {
experiment_name: {
"run": _data_fetch_for_search,
"stop": {
"time_total_s": max_time_for_trial_s
},
"num_samples": total_trails,
"local_dir": ray_results_save_dir,
"max_failures": 2
}
}
algo = HyperOptSearch(space, max_concurrent=num_parallel_trails, reward_attr="duration")
ray.init(redirect_output=True)
run_experiments(experiment_spec, scheduler=ahb, search_alg=algo, verbose=False)
Hm, not sure from the initial looks of this.
Can you try turning test_tfrecords_reader into a remote function and then executing 2 copies of that in parallel?
Thanks @richardliaw for helping me. It works fine with the below code,
@ray.remote
def test_tfrecords_reader(config, reporter):
records_to_fetch = config.get("records_to_fetch", 500)
timeout_s = 60 * 30
sess = tf.keras.backend.get_session()
records_fetched = 0
dataset = tfrecords_reader(DATA_FILES,
parallel_file_reads=48,
batch_size=512,
num_parallel_data_fetch=24,
buffer_size=250,
prefetch_buffer=250)
next_batch = dataset.make_one_shot_iterator().get_next()
start = time()
while True:
# try:
records = sess.run(next_batch) # an array of x's and y's
records_fetched += records[1].shape[0]
if records_fetched >= records_to_fetch or time() - start > timeout_s:
break
# except tf.errors.OutOfRangeError:
# print("Exhausted all data")
# break
print("Finished fetch in {}s and fetched {} records".format(time() - start, records_fetched))
reporter(duration=10)
return records_to_fetch
if __name__ == '__main__':
start = time()
ray.init()
t1 = test_tfrecords_reader.remote({"records_to_fetch": 1000}, lambda **kwargs: None)
t2 = test_tfrecords_reader.remote({"records_to_fetch": 6000}, lambda **kwargs: None)
assert (ray.get(t1) >= 1000)
assert (ray.get(t2) >= 6000)
print("Took {}s to finish".format(time() - start))
OK thanks. Can you post the full output of the original failure?
Its kind of huge and didn't want to strip anything out here. Please check the entire trace log file at https://drive.google.com/file/d/1DZsd7P8tIq3T2FZ76bbk56JM4L-HegDu/view?usp=sharing
The error is the same with PBT as well. Checked with both tensorflow versions 1.10 and 1.11 to eliminate any bugs due to tf.
Created a toy example with an attempt to reproduce the error at https://colab.research.google.com/drive/1Kl172GB0q-h7RIuiIhcUyV3wl6iFP6Fr
Here are my observations and I'm very confused now,
The error is the same even in a ubuntu docker container having only required libraries. Not sure whats the pattern here.
Can someone please help?
Sorry for the late reply. This is so odd; it works on the colab environment for me too. This is a nice notebook BTW!
Do you have a version of the notebook that I can run on a local machine?
Thanks @richardliaw . I found the issue. The current folder during experiments is set to folder in which the trial results are saved and not the directory in which the python process started. Due to this tensorflow couldn't find the input files as I used relative paths in the tf.dataset API. In Google Colab, I used the same folder for input data and also for ray results dir, by coincidence, as it was a temp folder anyway. I think it might be helpful to note this in the documentation if not already or if possible not change cwd during the trials.
If you think it would be a good example for others, feel free to add it some where in your documentation or repo :)
And here is the code that can run as a standalone python script, in an environment with numpy, pandas, ray, tensorflow, hyperopt installed.
# -*- coding: utf-8 -*-
"""ray-tune-with-tfrecords
Automatically generated by Colaboratory.
Original file is located at
https://colab.research.google.com/drive/1Kl172GB0q-h7RIuiIhcUyV3wl6iFP6Fr
##### Copyright 2018
"""
# MIT License
# Copyright (c) 2018 Nitin Pasumarthy
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""# Use Ray Tune with tf.dataset API"""
# !pip install --upgrade -q git+git://github.com/hyperopt/hyperopt.git
#
# !pip install -q ray
import json
import multiprocessing
import os
import traceback
import numpy as np
import pandas as pd
import ray
import tensorflow as tf
from hyperopt import hp
from ray.tune import run_experiments
from ray.tune.schedulers import AsyncHyperBandScheduler
from ray.tune.suggest import HyperOptSearch
from tensorflow import keras
from time import time
"""## Prepare Data
Since we want to test reading TFRecords file, lets convert an existing keras dataset into that format
"""
(train_data, train_labels), (test_data, test_labels) = keras.datasets.boston_housing.load_data()
"""Create the train dataset"""
X_train = pd.DataFrame(train_data,
columns=['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO', 'B',
'LSTAT'])
X_train.head()
y_train = pd.DataFrame(train_labels, columns=["Price"])
y_train.head()
train_df = pd.concat([X_train, y_train], axis=1)
train_df.head()
"""Repeat the same for test set. Here the word test is used interchangeably for validation set"""
X_test = pd.DataFrame(test_data,
columns=['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO', 'B',
'LSTAT'])
y_test = pd.DataFrame(test_labels, columns=["Price"])
test_df = pd.concat([X_test, y_test], axis=1)
test_df.head()
"""Define helpers for tensor features"""
def _int64_feature(value):
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
def _float_feature(value):
return tf.train.Feature(float_list=tf.train.FloatList(value=value))
"""Authenticate Colab to access your Google Drive - https://colab.research.google.com/notebooks/io.ipynb#scrollTo=u22w3BFiOveA. Need to re-authenticate every few hours"""
# from google.colab import drive
# drive.mount('/content/gdrive')
"""Create a folder called "tmp" in drive.google.com, and create all files there, so that its easy to delete the folder later"""
data_folder = os.path.abspath('./data/') # IMPORTANT: choose absolute paths for all input files to be used in ray's environment
if not os.path.exists(data_folder):
os.makedirs(data_folder)
"""A helper function which saves a numpy ndarray as a TFRecords file"""
def save_pd_df_to_tfrecords(dataframe, outfilename):
with tf.python_io.TFRecordWriter(os.path.join(data_folder, outfilename)) as writer:
for row in dataframe.values:
example = tf.train.Example(
features=tf.train.Features(feature={
'x': _float_feature(row[:-1]),
'y': _float_feature([row[-1]])
})
)
writer.write(example.SerializeToString())
"""Let us save the TFRecords file to your drive. It's only 43KB"""
save_pd_df_to_tfrecords(train_df, "train.tfrecords")
save_pd_df_to_tfrecords(test_df, "test.tfrecords")
"""## Input pipeline"""
def tfrecords_reader(data_files, parallel_file_reads, batch_size, num_parallel_data_fetch, buffer_size, prefetch_buffer,
is_training=False):
def decode(serialized_example):
features = tf.parse_single_example(serialized_example, features=feature_tensors)
label = features.pop("y")
return features, label
feature_tensors = {
"x": tf.FixedLenFeature([13], tf.float32),
"y": tf.FixedLenFeature([1], tf.float32)
}
dataset = tf.data.TFRecordDataset(data_files, num_parallel_reads=parallel_file_reads)
if is_training:
dataset = dataset.apply(tf.contrib.data.shuffle_and_repeat(buffer_size, -1, 42))
dataset = dataset.apply(tf.contrib.data.map_and_batch(decode, batch_size,
num_parallel_batches=num_parallel_data_fetch,
drop_remainder=False)
)
dataset = dataset.prefetch(prefetch_buffer)
return dataset
"""## Ray Tune
Let us use ray tune to optimize the above input pipeline
"""
try:
ray.init(redirect_output=True)
except Exception:
print("## Exception caught ##")
traceback.print_exc()
def _parse_ray_results(results_folder, output_filename=None):
"""
Parses & aggregates ray's trails results as a pandas dataframe
:param results_folder: path to the folder where the results of the trails are saved
:param output_filename: saves the dataframe as a CSV at this location if provided
:return aggregated dataframe of all trail's results
"""
results_files = tf.gfile.Glob(os.path.join(results_folder, "*", "result.json"))
print("Identified {} ray result files".format(len(results_files)))
trails_results = []
for f in results_files:
with open(f, "r") as file:
for line in file:
result = json.loads(line)
config = result.pop("config") # flatten the config for better analysis
trails_results.append(dict(result.items() + config.items()))
df = pd.DataFrame(trails_results)
if output_filename:
df.to_csv(output_filename, index=False)
print("Saved results at {}".format(output_filename))
return df
def _data_fetch_for_search(config, reporter):
records_to_fetch = 500
b = int(config["batch_size"])
pf = int(config["num_parallel_data_fetch"])
bu = int(config["buffer"])
p = int(config["parallel_file_reads"])
pb = int(config["prefetch_buffer_size"])
data_files = config["data_files"]
dataset = tfrecords_reader(data_files,
parallel_file_reads=p,
batch_size=b,
num_parallel_data_fetch=pf,
buffer_size=bu,
prefetch_buffer=pb,
is_training=True)
start = time()
next_batch = dataset.make_one_shot_iterator().get_next()
records_fetched = 0
with tf.Session() as sess:
while True:
try:
records = sess.run(next_batch) # an array of x's and y's
records_fetched += b # records[1].shape[0]
duration = time() - start
reporter(samples_freq=records_fetched / duration, duration=-duration, samples_fetched=-records_fetched)
if records_fetched >= records_to_fetch:
break
except tf.errors.OutOfRangeError:
print("Exhausted all data after reading {} records".format(records_fetched))
break
duration = time() - start
reporter(samples_freq=records_fetched / duration, duration=duration, samples_fetched=records_fetched)
def ray_search(data_files, total_trials=80, experiment_name="input_pipeline"):
num_parallel_trails = 4 # can be number of GPUs available
max_time_for_trial_s = 60 * 60 # 1 hour
cores = multiprocessing.cpu_count()
ray_results_save_dir = os.path.join(data_folder, "ray_search_results")
ahb = AsyncHyperBandScheduler(
time_attr="time_total_s",
reward_attr="duration",
grace_period=1,
max_t=max_time_for_trial_s)
# https://github.com/hyperopt/hyperopt/wiki/FMin#21-parameter-expressions
space = {
"batch_size": hp.uniform("batch_size", 128, 1024),
"num_parallel_data_fetch": hp.uniform("num_parallel_data_fetch", cores / 2, cores * 2),
"buffer": hp.uniform("buffer", 250, 2048),
"parallel_file_reads": hp.uniform("parallel_file_reads", cores / 2, cores * 2),
"prefetch_buffer_size": hp.uniform("prefetch_buffer_size", 250, 2048),
}
experiment_spec = {
experiment_name: {
"run": _data_fetch_for_search,
"stop": {
# "training_iteration": 100,
"time_total_s": max_time_for_trial_s
},
"trial_resources": {
"cpu": max(1, cores / num_parallel_trails),
"gpu": 0
},
"config": {
"data_files": data_files,
"batch_size": np.random.randint(128, 1024),
"num_parallel_data_fetch": np.random.randint(cores / 2, cores * 2),
"buffer": np.random.randint(250, 2048),
"parallel_file_reads": np.random.randint(cores / 2, cores * 2),
"prefetch_buffer_size": np.random.randint(250, 2048),
},
"num_samples": total_trials,
"local_dir": ray_results_save_dir,
"max_failures": 2
}
}
algo = HyperOptSearch(space, max_concurrent=num_parallel_trails, reward_attr="samples_freq")
run_experiments(experiment_spec, scheduler=ahb, search_alg=algo, verbose=False) # with HyperOpt space
# run_experiments(experiment_spec, scheduler=ahb, verbose=False) # with Random space
_parse_ray_results(os.path.join(ray_results_save_dir, experiment_name),
os.path.join(ray_results_save_dir, experiment_name, "agg_results.csv"))
def main(run_with_ray=True):
if run_with_ray:
ray_search([os.path.join(data_folder, "train.tfrecords"), os.path.join(data_folder, "test.tfrecords")],
total_trials=1)
else:
"""Run independently without Ray"""
def reporter_mock(**kwargs):
print("call to reporter: {}\n".format(kwargs))
data_files = [os.path.join(data_folder, "train.tfrecords"), os.path.join(data_folder, "test.tfrecords")]
cores = multiprocessing.cpu_count()
_data_fetch_for_search({
"data_files": data_files,
"batch_size": np.random.randint(128, 1024),
"num_parallel_data_fetch": np.random.randint(cores / 2, cores * 2),
"buffer": np.random.randint(250, 2048),
"parallel_file_reads": np.random.randint(cores / 2, cores * 2),
"prefetch_buffer_size": np.random.randint(250, 2048),
}, reporter_mock)
if __name__ == '__main__':
main()
Awesome! Glad it worked out.
BTW, you can do ray.init(ignore_reinit_error=True) to achieve the same
effect as your try-catch.
On Fri, Oct 12, 2018 at 11:45 AM Nitin Pasumarthy notifications@github.com
wrote:
And here is the code that can run as a standalone python script, in an
environment with numpy, pandas, ray, tensorflow, hyperopt installed.-- coding: utf-8 --"""ray-tune-with-tfrecordsAutomatically generated by Colaboratory.Original file is located at https://colab.research.google.com/drive/1Kl172GB0q-h7RIuiIhcUyV3wl6iFP6Fr##### Copyright 2018"""
MIT License
Copyright (c) 2018 Nitin Pasumarthy
Permission is hereby granted, free of charge, to any person obtaining a copy# of this software and associated documentation files (the "Software"), to deal# in the Software without restriction, including without limitation the rights# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell# copies of the Software, and to permit persons to whom the Software is# furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all# copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE# SOFTWARE.
"""# Use Ray Tune with tf.dataset API"""
!pip install --upgrade -q git+git://github.com/hyperopt/hyperopt.git## !pip install -q ray
import jsonimport multiprocessingimport osimport traceback
import numpy as npimport pandas as pdimport rayimport tensorflow as tffrom hyperopt import hpfrom ray.tune import run_experimentsfrom ray.tune.schedulers import AsyncHyperBandSchedulerfrom ray.tune.suggest import HyperOptSearchfrom tensorflow import kerasfrom time import time
"""## Prepare DataSince we want to test reading TFRecords file, lets convert an existing keras dataset into that format"""(train_data, train_labels), (test_data, test_labels) = keras.datasets.boston_housing.load_data()
"""Create the train dataset"""X_train = pd.DataFrame(train_data,
columns=['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO', 'B',
'LSTAT'])X_train.head()
y_train = pd.DataFrame(train_labels, columns=["Price"])
y_train.head()
train_df = pd.concat([X_train, y_train], axis=1)
train_df.head()
"""Repeat the same for test set. Here the word test is used interchangeably for validation set"""X_test = pd.DataFrame(test_data,
columns=['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO', 'B',
'LSTAT'])
y_test = pd.DataFrame(test_labels, columns=["Price"])
test_df = pd.concat([X_test, y_test], axis=1)
test_df.head()
"""Define helpers for tensor features"""def _int64_feature(value):
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))def _float_feature(value):
return tf.train.Feature(float_list=tf.train.FloatList(value=value))"""Authenticate Colab to access your Google Drive - https://colab.research.google.com/notebooks/io.ipynb#scrollTo=u22w3BFiOveA. Need to re-authenticate every few hours"""
from google.colab import drive# drive.mount('/content/gdrive')
"""Create a folder called "tmp" in drive.google.com, and create all files there, so that its easy to delete the folder later"""
data_folder = os.path.abspath('./data/') # IMPORTANT: choose absolute paths for all input files to be used in ray's environment
if not os.path.exists(data_folder):
os.makedirs(data_folder)
"""A helper function which saves a numpy ndarray as a TFRecords file"""def save_pd_df_to_tfrecords(dataframe, outfilename):
with tf.python_io.TFRecordWriter(os.path.join(data_folder, outfilename)) as writer:
for row in dataframe.values:
example = tf.train.Example(
features=tf.train.Features(feature={
'x': _float_feature(row[:-1]),
'y': _float_feature([row[-1]])
})
)
writer.write(example.SerializeToString())"""Let us save the TFRecords file to your drive. It's only 43KB"""
save_pd_df_to_tfrecords(train_df, "train.tfrecords")
save_pd_df_to_tfrecords(test_df, "test.tfrecords")
"""## Input pipeline"""def tfrecords_reader(data_files, parallel_file_reads, batch_size, num_parallel_data_fetch, buffer_size, prefetch_buffer,
is_training=False):
def decode(serialized_example):
features = tf.parse_single_example(serialized_example, features=feature_tensors)
label = features.pop("y")
return features, labelfeature_tensors = { "x": tf.FixedLenFeature([13], tf.float32), "y": tf.FixedLenFeature([1], tf.float32) } dataset = tf.data.TFRecordDataset(data_files, num_parallel_reads=parallel_file_reads) if is_training: dataset = dataset.apply(tf.contrib.data.shuffle_and_repeat(buffer_size, -1, 42)) dataset = dataset.apply(tf.contrib.data.map_and_batch(decode, batch_size, num_parallel_batches=num_parallel_data_fetch, drop_remainder=False) ) dataset = dataset.prefetch(prefetch_buffer) return dataset"""## Ray TuneLet us use ray tune to optimize the above input pipeline"""
try:
ray.init(redirect_output=True)except Exception:
print("## Exception caught ##")
traceback.print_exc()def _parse_ray_results(results_folder, output_filename=None):
""" Parses & aggregates ray's trails results as a pandas dataframe :param results_folder: path to the folder where the results of the trails are saved :param output_filename: saves the dataframe as a CSV at this location if provided :return aggregated dataframe of all trail's results """
results_files = tf.gfile.Glob(os.path.join(results_folder, "*", "result.json"))
print("Identified {} ray result files".format(len(results_files)))
trails_results = []
for f in results_files:
with open(f, "r") as file:
for line in file:
result = json.loads(line)
config = result.pop("config") # flatten the config for better analysis
trails_results.append(dict(result.items() + config.items()))
df = pd.DataFrame(trails_results)
if output_filename:
df.to_csv(output_filename, index=False)
print("Saved results at {}".format(output_filename))
return dfdef _data_fetch_for_search(config, reporter):
records_to_fetch = 500b = int(config["batch_size"]) pf = int(config["num_parallel_data_fetch"]) bu = int(config["buffer"]) p = int(config["parallel_file_reads"]) pb = int(config["prefetch_buffer_size"]) data_files = config["data_files"] dataset = tfrecords_reader(data_files, parallel_file_reads=p, batch_size=b, num_parallel_data_fetch=pf, buffer_size=bu, prefetch_buffer=pb, is_training=True) start = time() next_batch = dataset.make_one_shot_iterator().get_next() records_fetched = 0 with tf.Session() as sess: while True: try: records = sess.run(next_batch) # an array of x's and y's records_fetched += b # records[1].shape[0] duration = time() - start reporter(samples_freq=records_fetched / duration, duration=-duration, samples_fetched=-records_fetched) if records_fetched >= records_to_fetch: break except tf.errors.OutOfRangeError: print("Exhausted all data after reading {} records".format(records_fetched)) break duration = time() - start reporter(samples_freq=records_fetched / duration, duration=duration, samples_fetched=records_fetched)def ray_search(data_files, total_trials=80, experiment_name="input_pipeline"):
num_parallel_trails = 4 # can be number of GPUs available
max_time_for_trial_s = 60 * 60 # 1 hour
cores = multiprocessing.cpu_count()
ray_results_save_dir = os.path.join(data_folder, "ray_search_results")ahb = AsyncHyperBandScheduler( time_attr="time_total_s", reward_attr="duration", grace_period=1, max_t=max_time_for_trial_s) # https://github.com/hyperopt/hyperopt/wiki/FMin#21-parameter-expressions space = { "batch_size": hp.uniform("batch_size", 128, 1024), "num_parallel_data_fetch": hp.uniform("num_parallel_data_fetch", cores / 2, cores * 2), "buffer": hp.uniform("buffer", 250, 2048), "parallel_file_reads": hp.uniform("parallel_file_reads", cores / 2, cores * 2), "prefetch_buffer_size": hp.uniform("prefetch_buffer_size", 250, 2048), } experiment_spec = { experiment_name: { "run": _data_fetch_for_search, "stop": { # "training_iteration": 100, "time_total_s": max_time_for_trial_s }, "trial_resources": { "cpu": max(1, cores / num_parallel_trails), "gpu": 0 }, "config": { "data_files": data_files, "batch_size": np.random.randint(128, 1024), "num_parallel_data_fetch": np.random.randint(cores / 2, cores * 2), "buffer": np.random.randint(250, 2048), "parallel_file_reads": np.random.randint(cores / 2, cores * 2), "prefetch_buffer_size": np.random.randint(250, 2048), }, "num_samples": total_trials, "local_dir": ray_results_save_dir, "max_failures": 2 } } algo = HyperOptSearch(space, max_concurrent=num_parallel_trails, reward_attr="samples_freq") run_experiments(experiment_spec, scheduler=ahb, search_alg=algo, verbose=False) # with HyperOpt space # run_experiments(experiment_spec, scheduler=ahb, verbose=False) # with Random space _parse_ray_results(os.path.join(ray_results_save_dir, experiment_name), os.path.join(ray_results_save_dir, experiment_name, "agg_results.csv"))def main(run_with_ray=True):
if run_with_ray:
ray_search([os.path.join(data_folder, "train.tfrecords"), os.path.join(data_folder, "test.tfrecords")],
total_trials=1)
else:"""Run independently without Ray""" def reporter_mock(**kwargs): print("call to reporter: {}\n".format(kwargs)) data_files = [os.path.join(data_folder, "train.tfrecords"), os.path.join(data_folder, "test.tfrecords")] cores = multiprocessing.cpu_count() _data_fetch_for_search({ "data_files": data_files, "batch_size": np.random.randint(128, 1024), "num_parallel_data_fetch": np.random.randint(cores / 2, cores * 2), "buffer": np.random.randint(250, 2048), "parallel_file_reads": np.random.randint(cores / 2, cores * 2), "prefetch_buffer_size": np.random.randint(250, 2048), }, reporter_mock)if __name__ == '__main__':
main()—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/ray-project/ray/issues/3036#issuecomment-429422756,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AEUc5Wloko7nqCxwxWvUbdsSIix3INOmks5ukONegaJpZM4XOhIK
.
Oh good to know. One small question,
what is a good way to pass non-search space data to the trial function (_data_fetch_for_search above)? For example, _data_fetch_for_search above has DATA_FILES global variable which is a list of strings. Similarly I have more complex types like nested dictionaries. Want to avoid using global variables. Tried using config["data_files"] but didn't feel it clean and did not work for passing complex types. I didnt want to refactor everything to a Trainable class just for passing arguments.