mirror of
https://github.com/twitter/the-algorithm.git
synced 2025-01-05 09:01:54 +01:00
ef4c5eb65e
Please note we have force-pushed a new initial commit in order to remove some publicly-available Twitter user information. Note that this process may be required in the future.
943 lines
31 KiB
Python
943 lines
31 KiB
Python
"""
|
|
This module contains utility functions for twml.
|
|
"""
|
|
|
|
import argparse
|
|
from datetime import datetime
|
|
import itertools
|
|
import json
|
|
import logging as _logging
|
|
import os
|
|
import re
|
|
|
|
from twitter.ml.common.resources import AuroraPath
|
|
from twitter.deepbird.hparam import HParams
|
|
from twitter.deepbird.io.util import (
|
|
_get_feature_id, # noqa: F401
|
|
feature_id, # noqa: F401
|
|
preprocess_feature_regex, # noqa: F401
|
|
preprocess_path, # noqa: F401
|
|
sanitize_hdfs_path, # noqa: F401
|
|
is_string, # noqa: F401
|
|
list_files, # noqa: F401
|
|
match_files, # noqa: F401
|
|
)
|
|
from twitter.deepbird.io.legacy.util import (
|
|
batch_apply, # noqa: F401
|
|
boolean_mask, # noqa: F401
|
|
fixed_length_tensor, # noqa: F401
|
|
)
|
|
from twitter.deepbird.sparse.util import (
|
|
convert_to_sparse, # noqa: F401
|
|
limit_bits, # noqa: F401
|
|
)
|
|
|
|
from dateutil import rrule
|
|
from joblib import delayed, Parallel
|
|
from six import string_types
|
|
|
|
from absl import logging
|
|
from libtwml import CLIB, OPLIB # noqa: F401
|
|
import tensorflow.compat.v1 as tf
|
|
from tensorflow.python.platform import tf_logging
|
|
import twml
|
|
from twml.feature_config import FeatureConfigBuilder
|
|
|
|
|
|
# big_prime is less than 2**32
|
|
# This just needs to be co-prime with powers of 2
|
|
# any large prime is sufficient, but it's not necessary.
|
|
HASHING_PRIME = 2479700537
|
|
|
|
|
|
def multiplicative_hash(input, hash_constant=HASHING_PRIME):
|
|
return input * hash_constant
|
|
|
|
|
|
def _return_tensors_from_checkpoint_folder(init_dir, model_name=None):
|
|
"""Returns tensors list from a checkpoint folder
|
|
|
|
Args:
|
|
init_dir: Name of the checkpoint directory.
|
|
model_name: the model which we will use to obtain the checkpoint
|
|
(e.g. model.ckpt-50000) if set to None it will default to the
|
|
latest model saved in the checkpont file.
|
|
|
|
"""
|
|
if model_name is None:
|
|
# gets the most recently generated model.cpkt file
|
|
model_path = tf.train.latest_checkpoint(init_dir)
|
|
if model_path is None:
|
|
raise ValueError("Could not find a valid model checkpoint inside the directory")
|
|
else:
|
|
model_path = os.path.join(init_dir, model_name)
|
|
reader = tf.train.NewCheckpointReader(model_path)
|
|
try:
|
|
return (reader.debug_string().decode("utf-8"))
|
|
except OSError:
|
|
logging.error('Could not decode the string')
|
|
|
|
|
|
def get_scope_dict(init_dir, incoming_scope_name, current_scope_name, model_name=None):
|
|
"""Returns tensors map from a checkpoint file.
|
|
|
|
Args:
|
|
file_name:
|
|
Name of the checkpoint directory.
|
|
incoming_scope_name:
|
|
scope name of the previous phase
|
|
current_scope_name:
|
|
scope name of current phase
|
|
model_name:
|
|
the model which we will use to obtain the checkpoint
|
|
(e.g. model.ckpt-50000) if set to None it will default
|
|
to the latest model saved in the checkpoint file.
|
|
Returns:
|
|
init_map:
|
|
init_map which will be inputted to the checkpoint
|
|
"""
|
|
init_map = {}
|
|
reader_dump = _return_tensors_from_checkpoint_folder(init_dir=init_dir,
|
|
model_name=model_name).splitlines()
|
|
for member in reader_dump:
|
|
# remove global_step since it is not necessary
|
|
if 'global_step' not in member:
|
|
saved_variables = str(member.split(" ")[0])
|
|
saved_scope = saved_variables.rsplit('/', 1)[0] + "/"
|
|
new_scope = saved_scope.replace(incoming_scope_name, current_scope_name, 1)
|
|
# create key in init_map
|
|
if saved_scope not in init_map.keys(): # pylint: disable=dict-keys-not-iterating
|
|
init_map[saved_scope] = new_scope
|
|
return init_map
|
|
|
|
|
|
def get_init_map(
|
|
init_from_dir,
|
|
exclude_var_names=None,
|
|
exclude_name_scopes=None,
|
|
name_scope_to_remove=None,
|
|
name_scope_to_prepend=None):
|
|
"""
|
|
Builds a map for initializing from a checkpoint (see tf.train.init_from_checkpoint).
|
|
|
|
It assumes that the latter part of the variable names are consistent between the checkpoint and
|
|
the new model, but their name_scopes may be different. If the checkpoint model has variable names
|
|
of the form old/scope/var/foo, and the corresponding variable names for the new model should be
|
|
my/new/scope/var/foo, then you should set name_scope_to_remove = 'old/' and
|
|
name_scope_to_prepend = 'my/new/'.
|
|
|
|
This function can be used to
|
|
|
|
1. Generate an ``init_map`` map that can be passed to the ``Trainer`` init or
|
|
2. Used to generate an ``init_map`` directly inside ``build_graph_fn``, in
|
|
which case it should be passed directly to ``tf.train.init_from_checkpoint`` inside
|
|
``build_graph_fn``, in which case you do not also need to specify the ``init_map`` argument to
|
|
the trainer.
|
|
|
|
Parameters
|
|
----------
|
|
init_from_dir: Directory containing checkpoint
|
|
exclude_var_names: list[str]
|
|
List of variables in the checkpoint that should be excluded from the map.
|
|
exclude_name_scopes: list[str]
|
|
List of name_scopes in the checkpoint model that should be excluded from the map.
|
|
name_scope_to_remove: str
|
|
portion of name_scope for checkpoint variables that should not be included in variable names
|
|
for new model.
|
|
name_scope_to_prepend: str
|
|
name_scope to prepend to variable names in checkpoint to give variable names for new model.
|
|
|
|
Returns
|
|
-------
|
|
dict
|
|
keys are variable names in the checkpoint and values are variable names in the new model,
|
|
into which the checkpoint parameters should be loaded.
|
|
"""
|
|
vars_to_restore = get_checkpoint_variable_names(
|
|
init_from_dir,
|
|
exclude_var_names=exclude_var_names,
|
|
exclude_scopes=exclude_name_scopes,
|
|
)
|
|
|
|
if name_scope_to_prepend is not None:
|
|
if not name_scope_to_prepend.endswith('/'):
|
|
name_scope_to_prepend += '/'
|
|
|
|
if name_scope_to_remove is not None:
|
|
if not name_scope_to_remove.endswith('/'):
|
|
name_scope_to_remove += '/'
|
|
|
|
init_map = {}
|
|
|
|
for var_name in vars_to_restore:
|
|
var_name_checkpoint = var_name
|
|
|
|
if name_scope_to_remove is not None:
|
|
var_name = var_name.replace(name_scope_to_remove, '')
|
|
|
|
var_name_new_model = var_name
|
|
|
|
if name_scope_to_prepend is not None:
|
|
var_name_new_model = name_scope_to_prepend + var_name_new_model
|
|
|
|
init_map[var_name_checkpoint] = var_name_new_model
|
|
|
|
return init_map
|
|
|
|
|
|
def get_checkpoint_variable_names(model_dir, exclude_var_names=None, exclude_scopes=None):
|
|
"""
|
|
Gets a list of variable names from the latest checkpoint in model_dir.
|
|
Removes variables with scope defined by exclude_scopes, and/or with names defined by
|
|
exclude_var_names.
|
|
|
|
Args:
|
|
model_dir (str): Directory containing checkpoint file for the pre-trained model
|
|
exclude_var_names (list): Optional variable names to exclude (can include full/partial scope)
|
|
exclude_scopes (list): Optional scopes to exclude
|
|
|
|
Returns:
|
|
list: variable names
|
|
"""
|
|
checkpoint_path = tf.train.latest_checkpoint(model_dir)
|
|
variables_and_shapes = tf.train.list_variables(checkpoint_path)
|
|
|
|
def _keep(name):
|
|
if exclude_scopes and any(name.startswith(exc_scope) for exc_scope in exclude_scopes):
|
|
return False
|
|
if exclude_var_names and any(name.endswith(exc_var) for exc_var in exclude_var_names):
|
|
return False
|
|
return True
|
|
|
|
names = [x[0] for x in variables_and_shapes if _keep(x[0])]
|
|
|
|
return names
|
|
|
|
|
|
def to_snake_case(name):
|
|
"""
|
|
Changes name to snake case
|
|
"""
|
|
intermediate = re.sub('(.)([A-Z][a-z0-9]+)', r'\1_\2', name)
|
|
insecure = re.sub('([a-z])([A-Z])', r'\1_\2', intermediate).lower()
|
|
# If the class is private the name starts with "_" which is not secure
|
|
# for creating scopes. We prefix the name with "private" in this case.
|
|
if insecure[0] != '_':
|
|
return insecure
|
|
return 'private' + insecure
|
|
|
|
|
|
def copy_phase_inputs(init_dir, dest_dir):
|
|
"""Automatically copies the .json.tf from the init_dir to save_dir
|
|
so we can load multiple parameters at the same time.
|
|
|
|
Args:
|
|
init_dir:
|
|
Name of the checkpoint directory.
|
|
dest_dir:
|
|
Name of the output directory.
|
|
"""
|
|
if init_dir is not None:
|
|
# we are using tf.io.gfile so we can use it with both local and hdfs paths
|
|
for files in tf.io.gfile.listdir(init_dir):
|
|
if files.endswith(".json.tf"):
|
|
src_file = os.path.join(init_dir, files)
|
|
dest_file = os.path.join(dest_dir, files)
|
|
if not tf.io.gfile.exists(dest_dir):
|
|
# creates the folder
|
|
try:
|
|
tf.io.gfile.makedirs(dest_dir)
|
|
# to prevent racing condition
|
|
except OSError:
|
|
if not tf.io.gfile.isdir(dest_dir):
|
|
raise
|
|
# dest_file may be old if it exists and
|
|
# dest_file gets copied several times in distributed training
|
|
tf.io.gfile.copy(src_file, dest_file, overwrite=True)
|
|
|
|
|
|
def rehash_sparse_features_nbits(sp_a, nbits, hash_fn=multiplicative_hash):
|
|
"""
|
|
Rehash the feature ids of the sparse tensor,
|
|
and limit the output to n bits.
|
|
|
|
This is useful for making the distribution of
|
|
feature_ids more uniform, which may improve performance
|
|
in some situations.
|
|
|
|
This would typically be used on the output of
|
|
PercentileDiscretizer, since it assigns many
|
|
bins to low-valued output feature ids.
|
|
|
|
Input feature IDs should take values less than 2**32,
|
|
and nbits should be less than 32
|
|
|
|
Args:
|
|
sp_a:
|
|
a tf.SparseTensor object
|
|
nbits:
|
|
integer number of bits to mask output feature_ids
|
|
hash_fn:
|
|
Function that takes integer values and returns hashes of these values.
|
|
The output does not need to be masked to the desired number of bits,
|
|
as this masking will be taken care of. Default value = multiplicative_hash.
|
|
|
|
Returns:
|
|
a new tf.SparseTensor
|
|
"""
|
|
|
|
feature_ids = sp_a.indices[:, 1]
|
|
feature_ids = hash_fn(feature_ids)
|
|
|
|
sample_ids = sp_a.indices[:, 0]
|
|
values = sp_a.values
|
|
dense_shape = sp_a.dense_shape
|
|
|
|
indices = tf.stack([sample_ids, feature_ids], axis=1)
|
|
|
|
sp_a = tf.SparseTensor(indices, values, dense_shape)
|
|
|
|
# note - we need 2**nbits >= batch size
|
|
# otherwise, sample_ids will be squashed by the mask.
|
|
return limit_sparse_tensor_size(sp_a, nbits)
|
|
|
|
|
|
def convert_to_hparams(opt):
|
|
"""
|
|
Converts argparse.Namespace object to twitter.deepbird.hparam.hparam.HParams.
|
|
Note that tensorflow.contrib.training.HParams is gone in TF 2.x, and we forward ported
|
|
tensorflow.contrib.training.HParams to twitter.deepbird.hparam.hapram.HParams.
|
|
|
|
NOTE: If you are using estimators, please don't call this method and directly pass python dict
|
|
to TensorFlow estimator. Starting TensorFlow 2.0, Estimator will only accept dicts.
|
|
"""
|
|
|
|
# Convert to dict so we can iterate through it cleanly.
|
|
if isinstance(opt, argparse.Namespace):
|
|
params_dict = vars(opt)
|
|
elif isinstance(opt, dict):
|
|
params_dict = opt
|
|
elif isinstance(opt, HParams):
|
|
logging.warning('If you are using Estimator, please pass python dict directly to Estimator.')
|
|
params_dict = opt.values()
|
|
else:
|
|
raise ValueError("Input can not be of type %s. "
|
|
"It can be one of { argparse.Namespace, dict, "
|
|
"twitter.deepbird.hparam.HParams}."
|
|
% type(opt))
|
|
|
|
params = HParams()
|
|
# Hack to convert all parameters from hdfs:/// format to hdfs://default/
|
|
# Note: .items() makes a copy in python 2.7, but that is fine since the performance isn't critical.
|
|
for key, val in params_dict.items():
|
|
val = params_dict[key]
|
|
# Fix the path if the value is a string
|
|
if isinstance(val, str):
|
|
params.add_hparam(key, sanitize_hdfs_path(val))
|
|
else:
|
|
params.add_hparam(key, val)
|
|
|
|
return params
|
|
|
|
|
|
def dynamic_partition(features, partitions, num_partitions=2, name=None):
|
|
"""
|
|
Partitions each of the tensor in features using the provided mask.
|
|
|
|
Args:
|
|
features:
|
|
A single tensor or an iterable of tensors (list, tuple, dict)
|
|
partitions:
|
|
A bool or integer tensor representing the partitions.
|
|
|
|
Returns partitioned outputs as a list. Each element of the list is the same type as features.
|
|
|
|
This uses tf.dynamic_partition but adds the following niceties:
|
|
- features can be a list or dict of different tensor types.
|
|
- only a partition tensor is used to partition all the feature tensors recursively.
|
|
- the partition tensor is automatically converted into an integer tensor.
|
|
- defaults to num_partitions == 2
|
|
"""
|
|
|
|
if not isinstance(features, (dict, list, tuple, tf.Tensor)):
|
|
raise AssertionError("features container must be a dict, list, or tuple, tf.Tensor")
|
|
|
|
if isinstance(partitions, tf.Tensor):
|
|
partitions = tf.cast(partitions, tf.int32)
|
|
|
|
if isinstance(features, tf.Tensor):
|
|
return tf.dynamic_partition(features, partitions, num_partitions, name)
|
|
|
|
outputs = []
|
|
for _ in range(num_partitions):
|
|
if isinstance(features, (tuple, list)):
|
|
# Create an empty list of lists first, will be converted to right type afterwards.
|
|
outputs.append([None for _ in range(len(features))])
|
|
else:
|
|
outputs.append(dict())
|
|
|
|
iterable = features.items() if isinstance(features, dict) else enumerate(features)
|
|
|
|
# Handling partitions of nested classes handled here:
|
|
# Recursively call dynamic_partition for containers
|
|
for key, feature in iterable:
|
|
name_key = None if name is None else name + "_" + str(key)
|
|
if isinstance(partitions, tf.Tensor):
|
|
results = tf.dynamic_partition(feature, partitions, num_partitions, name_key)
|
|
else:
|
|
results = tf.dynamic_partition(feature, partitions[key], num_partitions[key], name_key)
|
|
# Append the result to the proper output container
|
|
for idx, result in enumerate(results):
|
|
outputs[idx][key] = result
|
|
|
|
# if input is tuple, convert list of lists back to list of tuples
|
|
if isinstance(features, tuple):
|
|
outputs = [type(features)(output) for output in outputs]
|
|
|
|
return outputs
|
|
|
|
|
|
def write_file(filename, contents, encode=False):
|
|
'''
|
|
Optionally encodes contents and writes contents to a file.
|
|
|
|
Arguments:
|
|
filename:
|
|
path to file where the contents will be saved.
|
|
Accepts HDFS and local paths.
|
|
contents:
|
|
contents to save to the file.
|
|
Must be a string when encode is False.
|
|
encode:
|
|
False | 'json'. When encode='json', contents is encoded
|
|
with json.dumps.
|
|
'''
|
|
if encode == 'json':
|
|
contents = json.dumps(contents)
|
|
elif not is_string(contents):
|
|
raise ValueError("Expecting string for encode=False")
|
|
|
|
graph = tf.Graph()
|
|
with graph.as_default():
|
|
write = tf.write_file(filename, contents)
|
|
|
|
with tf.Session(graph=graph) as sess:
|
|
sess.run(write)
|
|
|
|
|
|
def read_file(filename, decode=False):
|
|
'''
|
|
Reads contents from a file and optionally decodes it.
|
|
|
|
Arguments:
|
|
filename:
|
|
path to file where the contents will be loaded from.
|
|
Accepts HDFS and local paths.
|
|
decode:
|
|
False | 'json'. When decode='json', contents is decoded
|
|
with json.loads. When False, contents is returned as is.
|
|
|
|
Returns:
|
|
contents
|
|
'''
|
|
graph = tf.Graph()
|
|
with graph.as_default():
|
|
read = tf.read_file(filename)
|
|
|
|
with tf.Session(graph=graph) as sess:
|
|
contents = (sess.run(read))
|
|
# particular version of TF and/or Python may or may not perform decoding step from utf-8 to str
|
|
if not isinstance(contents, str):
|
|
contents = contents.decode()
|
|
|
|
if decode == 'json':
|
|
contents = json.loads(contents)
|
|
|
|
return contents
|
|
|
|
def setup_tf_logging_formatter():
|
|
formatter = _logging.Formatter(
|
|
'%(asctime)s [%(levelname)s] %(name)s: %(message)s',
|
|
None)
|
|
# Setting up absl logging verbosity
|
|
logging.set_verbosity('info')
|
|
logging.set_stderrthreshold('info')
|
|
logging.get_absl_handler().setFormatter(formatter)
|
|
tf.logging.set_verbosity(tf.logging.INFO)
|
|
# Set tensorflow logging handler format
|
|
if len(tf_logging.get_logger().handlers) > 0:
|
|
tf_logging.get_logger().handlers[0].setFormatter(formatter)
|
|
|
|
|
|
def set_tensorflow_log_level(log_level):
|
|
"""
|
|
Sets tensorflow's default logging level.
|
|
|
|
0. all logs are shown.
|
|
1. filter out INFO logs.
|
|
2. filter out WARNINGs and INFOs.
|
|
3. filter out ERRORs, WARNINGs, and INFOs.
|
|
|
|
Note that tf.Print output are INFO logs, so setting log_level above 0 would hide
|
|
output from tf.Print.
|
|
"""
|
|
assert isinstance(log_level, int) and log_level >= 0 and log_level <= 3
|
|
os.environ['TF_CPP_MIN_LOG_LEVEL'] = str(log_level)
|
|
|
|
|
|
def weighted_average(values, weights):
|
|
"""
|
|
Compute a weighted average using the given values and weights.
|
|
E.g. this is usually used to compute a weighted loss given sample weights.
|
|
"""
|
|
return tf.reduce_sum(tf.multiply(values, weights)) / tf.reduce_sum(weights)
|
|
|
|
|
|
def backup_checkpoint(checkpoint_path_prefix,
|
|
backup_path='backup',
|
|
empty_backup=True):
|
|
"""
|
|
Creates a backup copy of a checkpoint in backup_dir.
|
|
This function is used by the Trainer for early-stopping.
|
|
|
|
Arguments:
|
|
checkpoint_path_prefix:
|
|
Prefix of the path to the checkpoint files.
|
|
backup_path:
|
|
path to a directory where checkpoint files will be backed up.
|
|
empty_backup:
|
|
When True (the default), the current contents of the backup directory
|
|
are removed before the backup is performed.
|
|
|
|
Returns:
|
|
The number of backed up files.
|
|
"""
|
|
checkpoint_file_prefix = os.path.basename(checkpoint_path_prefix)
|
|
|
|
if tf.io.gfile.exists(backup_path) and empty_backup:
|
|
tf.io.gfile.rmtree(backup_path)
|
|
|
|
tf.io.gfile.mkdir(backup_path)
|
|
|
|
n_backup = 0
|
|
# copy all checkpoint files to backup directory (TODO use gfile.glob instead)
|
|
try:
|
|
checkpoint_files = tf.io.gfile.glob(checkpoint_path_prefix + "*")
|
|
if len(checkpoint_files) == 0:
|
|
raise twml.errors.CheckpointNotFoundError("%s not found" % checkpoint_path_prefix)
|
|
for filename in checkpoint_files:
|
|
n_backup += 1
|
|
tf.io.gfile.copy(
|
|
src=filename,
|
|
dst=os.path.join(backup_path, os.path.basename(filename))
|
|
)
|
|
except tf.errors.OpError as ex:
|
|
raise twml.errors.CheckpointNotFoundError(
|
|
f"{str(ex)}\n {checkpoint_path_prefix} not found."
|
|
)
|
|
|
|
# tf.train.latest_checkpoint needs the 'checkpoint' file.
|
|
with tf.io.gfile.GFile(os.path.join(backup_path, 'checkpoint'), 'w') as f:
|
|
f.write('model_checkpoint_path: "%s"\n' % checkpoint_file_prefix)
|
|
|
|
return n_backup
|
|
|
|
|
|
def set_only_checkpoint(source_path, dest_path, remove_source=True):
|
|
"""
|
|
Removes the checkpoint and model.ckpt* files from dest_path.
|
|
Moves the latest checkpoint from source_path to dest_path.
|
|
|
|
Arguments:
|
|
source_path:
|
|
path to directory containing the latest checkpoint.
|
|
Should contain a valid checkpoint file and model.ckpt files.
|
|
For early-stopping, this should be the save_dir/best_checkpoint dir.
|
|
dest_path:
|
|
path to directory where the latest checkpoint files will be moved.
|
|
All its checkpoint and model.ckpt* files will be removed.
|
|
For early-stopping, this should be the save_dir.
|
|
remove_source:
|
|
When True (the default), deletes the source directory.
|
|
Note that even when False, its checkpoint files are moved to
|
|
dest_path anyway.
|
|
This deletes the source directory (and any remaining contents).
|
|
"""
|
|
# make it so that source_path checkpoint is the only checkpoint
|
|
source_path_prefix = tf.train.latest_checkpoint(source_path)
|
|
if source_path_prefix is not None:
|
|
# remove intermediate checkpoints
|
|
for filename in tf.io.gfile.listdir(dest_path):
|
|
if filename.startswith("model.ckpt"):
|
|
tf.io.gfile.Remove(os.path.join(dest_path, filename))
|
|
# move contents of source_path to dest_path
|
|
for filename in tf.io.gfile.listdir(source_path):
|
|
tf.io.gfile.rename(
|
|
oldname=os.path.join(source_path, filename),
|
|
newname=os.path.join(dest_path, filename),
|
|
overwrite=True) # overwrite "checkpoint" file
|
|
# delete the source_path dir
|
|
if remove_source:
|
|
tf.io.gfile.rmtree(source_path)
|
|
|
|
|
|
def list_files_by_datetime(
|
|
base_path,
|
|
start_datetime,
|
|
end_datetime=None,
|
|
datetime_prefix_format='%Y/%m/%d/%H',
|
|
extension='lzo',
|
|
parallelism=1,
|
|
hour_resolution=1,
|
|
sort=False
|
|
):
|
|
"""List files matching `base_path/dt_prefix_format/*.extension` for the requested datetime range.
|
|
|
|
Args:
|
|
base_path:
|
|
The base path. If `None`, returns `None`.
|
|
start_datetime:
|
|
A `datetime.datetime` or string representing the start of the range (inclusive).
|
|
If `None`, it returns `list_files(base_path, extension, sort)`.
|
|
end_datetime:
|
|
A `datetime.datetime` or string representing the end of the range (inclusive).
|
|
If `None`, assumed to be the same as start_datetime.
|
|
datetime_prefix_format:
|
|
Format compatible with `datetime.datetime.strftime`
|
|
(https://docs.python.org/2/library/datetime.html#strftime-and-strptime-behavior).
|
|
extension:
|
|
The extension of the files composing the dataset (e.g. 'lzo').
|
|
parallelism:
|
|
The number of threads used to process list patterns (this is mostly useful
|
|
when dealing with filesystems such as HDFS in which listing files is a potentially expensive
|
|
operation).
|
|
hour_resolution:
|
|
The separation between consecutive hours. The default value is 1.
|
|
sort:
|
|
bool, whether to return a sorted list of files. Default False.
|
|
|
|
Returns:
|
|
A list with all the matching files.
|
|
|
|
Raises:
|
|
errors.OpError: If there are filesystem / directory listing errors.
|
|
"""
|
|
if hour_resolution is None:
|
|
hour_resolution = 1
|
|
|
|
if base_path is None:
|
|
return None
|
|
|
|
if start_datetime is None:
|
|
return list_files(base_path, extension, sort)
|
|
|
|
# Do this in case people want to use a single day for training.
|
|
if end_datetime is None:
|
|
end_datetime = start_datetime
|
|
|
|
assert parallelism > 0
|
|
assert start_datetime <= end_datetime
|
|
|
|
if isinstance(start_datetime, str):
|
|
start_datetime = datetime.strptime(start_datetime, datetime_prefix_format)
|
|
|
|
if isinstance(end_datetime, str):
|
|
end_datetime = datetime.strptime(end_datetime, datetime_prefix_format)
|
|
|
|
assert isinstance(start_datetime, datetime)
|
|
assert isinstance(end_datetime, datetime)
|
|
|
|
base_path = preprocess_path(base_path)
|
|
|
|
def _handle_missing_globs(pattern):
|
|
try:
|
|
return tf.io.gfile.glob(pattern)
|
|
except tf.errors.NotFoundError as e:
|
|
tf.logging.warning(e.message)
|
|
return []
|
|
|
|
# a set is used because there might be some repeated globs depending on dt_prefix_format
|
|
globs = {
|
|
os.path.join(base_path, dt.strftime(datetime_prefix_format), '*.%s' % extension)
|
|
for dt in rrule.rrule(
|
|
freq=rrule.HOURLY, interval=hour_resolution, dtstart=start_datetime, until=end_datetime)
|
|
}
|
|
nested_files = Parallel(n_jobs=parallelism, backend='threading')(
|
|
delayed(_handle_missing_globs)(p) for p in globs
|
|
)
|
|
flattened_files = list(itertools.chain.from_iterable(nested_files))
|
|
|
|
if not flattened_files:
|
|
error_msg = "Files list is empty: base_path={base_path}, start_datetime={start_datetime}, end_datetime={end_datetime}".format(
|
|
base_path=base_path, start_datetime=start_datetime, end_datetime=end_datetime
|
|
)
|
|
raise OSError(error_msg)
|
|
|
|
if sort:
|
|
flattened_files = sorted(flattened_files)
|
|
|
|
return flattened_files
|
|
|
|
|
|
def limit_sparse_tensor_size(sparse_tf, input_size_bits, mask_indices=True):
|
|
"""
|
|
Returns a ``tf.SparseTensor`` which is the input SparseTensor
|
|
limited to the specified input_size_bits
|
|
|
|
Args:
|
|
sparse_tf:
|
|
twml.SparseTensor or tf.SparseTensor
|
|
input_size_bits:
|
|
The number of bits allocated to the input size.
|
|
Input size will be power(2,input_size_bits).
|
|
Note that twml.limit_bits truncates any feature keys that
|
|
exceed the input size.
|
|
mask_indices:
|
|
If mask indices is False; only the shape is changed. Defaults to True.
|
|
"""
|
|
if isinstance(sparse_tf, twml.SparseTensor):
|
|
sparse_tf = sparse_tf.to_tf()
|
|
if not isinstance(sparse_tf, tf.SparseTensor):
|
|
raise TypeError('Input argument `sparse_tf` should either be of type'
|
|
'twml.SparseTensor of tf.SparseTensor. Found type: {}'.
|
|
format(type(sparse_tf)))
|
|
if mask_indices:
|
|
indices = twml.limit_bits(sparse_tf.indices, input_size_bits)
|
|
else:
|
|
indices = sparse_tf.indices
|
|
dense_shape = tf.stack([sparse_tf.dense_shape[0], 1 << input_size_bits])
|
|
return tf.SparseTensor(indices=indices, values=sparse_tf.values,
|
|
dense_shape=dense_shape)
|
|
|
|
|
|
def create_module_spec(mlp_fn, mode, params, drop_collections=None):
|
|
"""
|
|
Creates a standard tags_and_args which should be passed to the create_module_spec
|
|
spec = hub.create_module_spec(mlp_fn, tags_and_args=tags_and_args).
|
|
|
|
Args:
|
|
module_fn:
|
|
a function to build a graph for the Module.
|
|
mode:
|
|
mode in which the Estimator is run
|
|
params:
|
|
parameters passed to the Estimator
|
|
"""
|
|
import tensorflow_hub as hub # noqa: F402
|
|
tags_and_args = [(set(), {"params": params, "mode": mode}), # serving graph
|
|
({"train"}, {"params": params, "mode": mode}) # training graph
|
|
]
|
|
spec = hub.create_module_spec(mlp_fn, tags_and_args=tags_and_args, drop_collections=drop_collections)
|
|
return spec
|
|
|
|
|
|
def change_name_scope_from_dir(init_scope_name, final_scope_name, save_dir):
|
|
"""
|
|
Changes the name of the saved scope to the desired name and saves it
|
|
to the same save_dir.
|
|
|
|
Args:
|
|
init_scope_name:
|
|
initial scope name
|
|
final_scope_name:
|
|
desired (final) scope name
|
|
save_dir:
|
|
directory which the scopes are saved
|
|
|
|
In the follwing section we:
|
|
- Read all the variables from the latest checkpoint.
|
|
- Make a copy of the variables with new name scope.
|
|
- Store both sets of variables into the latest checkpoint.
|
|
This essentially doubles up the size of the checkpoint.
|
|
But when a job is restarted after this part is done, the checkpoint size doubles again.
|
|
To avoid doing this, we create a copy in backup if a backup isn't found.
|
|
This allows us always read (from backup) and write same sized checkpoint files.
|
|
"""
|
|
|
|
# Create a backup_checkpoints dir
|
|
backup_dir = os.path.join(save_dir, "change_name_scope_backups")
|
|
tf.io.gfile.makedirs(backup_dir)
|
|
|
|
latest_checkpoint = tf.train.latest_checkpoint(save_dir)
|
|
|
|
if latest_checkpoint is None:
|
|
raise OSError("No checkpoints found in save_dir: %s" % save_dir)
|
|
|
|
latest_backup_checkpoint = tf.train.latest_checkpoint(backup_dir)
|
|
|
|
if (latest_backup_checkpoint is None or
|
|
(os.path.basename(latest_checkpoint) !=
|
|
os.path.basename(latest_backup_checkpoint))):
|
|
backup_checkpoint(latest_checkpoint, backup_dir, empty_backup=False)
|
|
|
|
variables = tf.train.list_variables(backup_dir)
|
|
with tf.Graph().as_default(), tf.Session().as_default() as sess:
|
|
new_variables = []
|
|
for name, _ in variables:
|
|
var = tf.train.load_variable(backup_dir, name)
|
|
# Append both the rename and the original variable
|
|
new_variables.append(
|
|
tf.Variable(var, name=name.replace(init_scope_name, final_scope_name)))
|
|
new_variables.append(tf.Variable(var, name=name))
|
|
# Save this to the checkpoint in the save_dir
|
|
saver = tf.train.Saver(new_variables)
|
|
sess.run(tf.global_variables_initializer())
|
|
saver.save(sess, latest_checkpoint) # pylint: disable=no-member
|
|
|
|
|
|
def hub_import(input, module, module_name, trainable=False):
|
|
"""
|
|
Loads exported hub module.
|
|
|
|
Args:
|
|
input:
|
|
input to hub module
|
|
module:
|
|
module path
|
|
module_name:
|
|
signature of the exported hub module
|
|
"""
|
|
import tensorflow_hub as hub # noqa: F402
|
|
hub_module = hub.Module(module, trainable=trainable)
|
|
output = hub_module(input, signature=module_name)
|
|
return output
|
|
|
|
|
|
def _extract_hash_space_bits(feature_config):
|
|
"""
|
|
Extract Sparse Shapes for contrib.FeatureConfig.
|
|
Arguments:
|
|
feature_config:
|
|
Feature Configuration of the type contrib.FeatureConfig
|
|
Returns:
|
|
Dictionary of tensor names and hash space bits.
|
|
"""
|
|
if not isinstance(feature_config, twml.contrib.feature_config.FeatureConfig):
|
|
fc_type = type(feature_config)
|
|
raise TypeError(f"Feature config must be of type contrib.FeatureConfig: {fc_type}")
|
|
sparse_shapes_dict = {}
|
|
for config in feature_config.sparse_extraction_configs:
|
|
sparse_shapes_dict[config.output_name] = config.hash_space_bits
|
|
return sparse_shapes_dict
|
|
|
|
|
|
def fix_shape_sparse(features, feature_config):
|
|
"""
|
|
Modifies the shape of features which are extracted using the hashing trick.
|
|
Features itself is changed by this function.
|
|
Arguments:
|
|
features:
|
|
Feature dictionary extracted by the feature config
|
|
feature_config:
|
|
Feature Configuration of the type contrib.FeatureConfig
|
|
"""
|
|
if not isinstance(feature_config, twml.contrib.feature_config.FeatureConfig):
|
|
raise TypeError(f"Feature config must be of type contrib.FeatureConfig, currently of {type(feature_config)}")
|
|
sparse_shape = _extract_hash_space_bits(feature_config)
|
|
if not isinstance(features, dict):
|
|
raise TypeError(f"features must be of dictionary type, it is of {type(features)} type")
|
|
for key in set(features) & set(sparse_shape):
|
|
features[key] = limit_sparse_tensor_size(features[key], sparse_shape[key], mask_indices=False)
|
|
|
|
|
|
def touch_file_in_dir(directory, filename):
|
|
"""
|
|
Creates a file named filename in directory.
|
|
|
|
Arguments:
|
|
filename: (str)
|
|
directory: (str)
|
|
"""
|
|
file_path = os.path.join(directory, filename)
|
|
with tf.io.gfile.GFile(file_path, "w") as f:
|
|
f.write("")
|
|
|
|
|
|
def file_exist_in_dir(directory: str, filename: str) -> bool:
|
|
file_path = os.path.join(directory, filename)
|
|
return tf.io.gfile.exists(file_path)
|
|
|
|
|
|
def copy_to_local(remote, local, filename, overwrite=False):
|
|
"""Function to file from remote directory to local directory."""
|
|
assert "hdfs://" not in local
|
|
tf.io.gfile.makedirs(local)
|
|
return tf.io.gfile.copy(
|
|
os.path.join(remote, filename),
|
|
os.path.join(local, filename),
|
|
overwrite=overwrite,
|
|
)
|
|
|
|
|
|
def copy_recursive(src, dst, overwrite=False):
|
|
"""
|
|
Function to copy a directory recursively.
|
|
|
|
Arguments:
|
|
src: Source directory.
|
|
dst: Destination directory.
|
|
overwrite: Specifies if files are to be overwritten if they exist.
|
|
"""
|
|
|
|
src = src.rstrip("/")
|
|
dst = dst.rstrip("/")
|
|
|
|
for dirname, subdirs, files in tf.io.gfile.walk(src):
|
|
dst_dirname = dirname.replace(src, dst)
|
|
tf.io.gfile.makedirs(dst_dirname)
|
|
|
|
for f in files:
|
|
src_f = os.path.join(dirname, f)
|
|
dst_f = os.path.join(dst_dirname, f)
|
|
|
|
tf.logging.info(f"Copying {src_f} to {dst_f}")
|
|
tf.io.gfile.copy(src_f, dst_f, overwrite=overwrite)
|
|
|
|
|
|
def delete_file_or_dir(path):
|
|
"""
|
|
Delete the file or directory given by `path`
|
|
Arguments:
|
|
path:
|
|
string indicating path of file or directory to remove
|
|
"""
|
|
if tf.io.gfile.isdir(path):
|
|
tf.io.gfile.rmtree(path)
|
|
else:
|
|
tf.io.gfile.remove(path)
|
|
|
|
|
|
def get_distributed_training_job_path():
|
|
"""
|
|
Function to get distributed training job path.
|
|
Note: distributed training has three jobs, one parameter server job,
|
|
one worker job and one evaluator job. All of these three jobs' name
|
|
share a common base job name.
|
|
"""
|
|
job_path = AuroraPath(dc=os.environ.get("TWML_JOB_CLUSTER"),
|
|
role=os.environ.get("TWML_JOB_ROLE"),
|
|
env=os.environ.get("TWML_JOB_ENV"),
|
|
job_name=os.environ.get("TWML_DISTRIBUTED_BASE_JOBNAME"))
|
|
return job_path
|
|
|
|
def do_every_n_steps(action, num_steps):
|
|
"""
|
|
Execute a sequence of TensorFlow operations only once in a while.
|
|
Specifically, `action` is performed if `global_step` is a
|
|
multiple of `num_steps`
|
|
|
|
Args:
|
|
action: callable to be performed at regular intervals. This callable
|
|
must return a TF op with no output tensors.
|
|
num_steps: period of performing the action, as measured
|
|
in number of training steps
|
|
|
|
Returns:
|
|
A TensorFlow op with no output tensors, like a tf.print() or tf.no_op().
|
|
You must use tf.control_dependencies() to execute the op.
|
|
|
|
"""
|
|
global_step = tf.train.get_or_create_global_step()
|
|
condition = tf.math.equal(tf.math.floormod(global_step, num_steps), 0)
|
|
return tf.cond(condition, action, lambda: tf.no_op())
|