Delete trust_and_safety_models directory

This commit is contained in:
dogemanttv 2024-01-10 17:09:42 -06:00 committed by GitHub
parent ac135ef78e
commit fa00596ed2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 0 additions and 2793 deletions

View File

@ -1,10 +0,0 @@
Trust and Safety Models
=======================
We decided to open source the training code of the following models:
- pNSFWMedia: Model to detect tweets with NSFW images. This includes adult and porn content.
- pNSFWText: Model to detect tweets with NSFW text, adult/sexual topics.
- pToxicity: Model to detect toxic tweets. Toxicity includes marginal content like insults and certain types of harassment. Toxic content does not violate Twitter's terms of service.
- pAbuse: Model to detect abusive content. This includes violations of Twitter's terms of service, including hate speech, targeted harassment and abusive behavior.
We have several more models and rules that we are not going to open source at this time because of the adversarial nature of this area. The team is considering open sourcing more models going forward and will keep the community posted accordingly.

View File

@ -1,276 +0,0 @@
import tensorflow as tf
physical_devices = tf.config.list_physical_devices('GPU')
for device in physical_devices:
tf.config.experimental.set_memory_growth(device, True)
from twitter.hmli.nimbus.modeling.model_config import FeatureType, EncodingType, Feature, Model, LogType
from twitter.hmli.nimbus.modeling.feature_loader import BigQueryFeatureLoader
from twitter.cuad.representation.models.text_encoder import TextEncoder
from twitter.cuad.representation.models.optimization import create_optimizer
from twitter.hmli.nimbus.modeling.feature_encoder import FeatureEncoder
import numpy as np
import pandas as pd
import utils
cat_names = [
...
]
category_features = [Feature(name=cat_name, ftype=FeatureType.CONTINUOUS) for cat_name in cat_names]
features = [
Feature(name="tweet_text_with_media_annotations", ftype=FeatureType.STRING, encoding=EncodingType.BERT),
Feature(name="precision_nsfw", ftype=FeatureType.CONTINUOUS),
Feature(name="has_media", ftype=FeatureType.BINARY),
Feature(name="num_media", ftype=FeatureType.DISCRETE)
] + category_features
ptos_prototype = Model(
name='ptos_prototype',
export_path="...",
features=features,
)
print(ptos_prototype)
cq_loader = BigQueryFeatureLoader(gcp_project=COMPUTE_PROJECT)
labels = [
"has_non_punitive_action",
"has_punitive_action",
"has_punitive_action_contains_self_harm",
"has_punitive_action_encourage_self_harm",
"has_punitive_action_episodic",
"has_punitive_action_episodic_hateful_conduct",
"has_punitive_action_other_abuse_policy",
"has_punitive_action_without_self_harm"
]
train_query = f"""
SELECT
{{feature_names}},
{",".join(labels)},
...
"""
val_query = f"""
SELECT
{{feature_names}},
{",".join(labels)},
...
"""
print(train_query)
train = cq_loader.load_features(ptos_prototype, "", "", custom_query=train_query)
val = cq_loader.load_features(ptos_prototype, "", "", custom_query=val_query)
print(train.describe(model=ptos_prototype))
params = {
'max_seq_lengths': 128,
'batch_size': 196,
'lr': 1e-5,
'optimizer_type': 'adamw',
'warmup_steps': 0,
'cls_dropout_rate': 0.1,
'epochs': 30,
'steps_per_epoch': 5000,
'model_type': 'twitter_multilingual_bert_base_cased_mlm',
'mixed_precision': True,
}
params
def parse_labeled_data(row_dict):
label = [row_dict.pop(l) for l in labels]
return row_dict, label
mirrored_strategy = tf.distribute.MirroredStrategy()
BATCH_SIZE = params['batch_size'] * mirrored_strategy.num_replicas_in_sync
train_ds = train.to_tf_dataset().map(parse_labeled_data).shuffle(BATCH_SIZE*100).batch(BATCH_SIZE).repeat()
val_ds = val.to_tf_dataset().map(parse_labeled_data).batch(BATCH_SIZE)
for record in train_ds:
tf.print(record)
break
def get_positive_weights():
"""Computes positive weights used for class imbalance from training data."""
label_weights_df = utils.get_label_weights(
"tos-data-media-full",
project_id="twttr-abusive-interact-prod",
dataset_id="tos_policy"
)
pos_weight_tensor = tf.cast(
label_weights_df.sort_values(by='label').positive_class_weight,
dtype=tf.float32
)
return pos_weight_tensor
pos_weight_tensor = get_positive_weights()
print(pos_weight_tensor)
class TextEncoderPooledOutput(TextEncoder):
def call(self, x):
return super().call([x])["pooled_output"]
def get_config(self):
return super().get_config()
with mirrored_strategy.scope():
text_encoder_pooled_output = TextEncoderPooledOutput(
params['max_seq_lengths'],
model_type=params['model_type'],
trainable=True
)
fe = FeatureEncoder(train)
inputs, preprocessing_head = fe.build_model_head(model=ptos_prototype, text_encoder=text_encoder_pooled_output)
cls_dropout = tf.keras.layers.Dropout(params['cls_dropout_rate'], name="cls_dropout")
outputs = cls_dropout(preprocessing_head)
outputs = tf.keras.layers.Dense(8, name="output", dtype="float32")(outputs)
model = tf.keras.Model(
inputs=inputs,
outputs=outputs
)
pr_auc = tf.keras.metrics.AUC(curve="PR", num_thresholds=1000, multi_label=True, from_logits=True)
custom_loss = lambda y_true, y_pred: utils.multilabel_weighted_loss(y_true, y_pred, weights=pos_weight_tensor)
optimizer = create_optimizer(
init_lr=params["lr"],
num_train_steps=(params["epochs"] * params["steps_per_epoch"]),
num_warmup_steps=params["warmup_steps"],
optimizer_type=params["optimizer_type"],
)
if params.get("mixed_precision"):
optimizer = tf.train.experimental.enable_mixed_precision_graph_rewrite(optimizer)
model.compile(
optimizer=optimizer,
loss=custom_loss,
metrics=[pr_auc]
)
model.weights
model.summary()
pr_auc.name
import getpass
import wandb
from wandb.keras import WandbCallback
try:
wandb_key = ...
wandb.login(...)
run = wandb.init(project='ptos_with_media',
group='new-split-trains',
notes='tweet text with only (num_media, precision_nsfw). on full train set, new split.',
entity='absv',
config=params,
name='tweet-text-w-nsfw-1.1',
sync_tensorboard=True)
except FileNotFoundError:
print('Wandb key not found')
run = wandb.init(mode='disabled')
import datetime
import os
start_train_time = datetime.datetime.now()
print(start_train_time.strftime("%m-%d-%Y (%H:%M:%S)"))
checkpoint_path = os.path.join("...")
print("Saving model checkpoints here: ", checkpoint_path)
cp_callback = tf.keras.callbacks.ModelCheckpoint(
filepath=os.path.join(checkpoint_path, "model.{epoch:04d}.tf"),
verbose=1,
monitor=f'val_{pr_auc.name}',
mode='max',
save_freq='epoch',
save_best_only=True
)
early_stopping_callback = tf.keras.callbacks.EarlyStopping(patience=7,
monitor=f"val_{pr_auc.name}",
mode="max")
model.fit(train_ds, epochs=params["epochs"], validation_data=val_ds, callbacks=[cp_callback, early_stopping_callback],
steps_per_epoch=params["steps_per_epoch"],
verbose=2)
import tensorflow_hub as hub
gs_model_path = ...
reloaded_keras_layer = hub.KerasLayer(gs_model_path)
inputs = tf.keras.layers.Input(name="tweet__core__tweet__text", shape=(1,), dtype=tf.string)
output = reloaded_keras_layer(inputs)
v7_model = tf.keras.models.Model(inputs=inputs, outputs=output)
pr_auc = tf.keras.metrics.AUC(curve="PR", name="pr_auc")
roc_auc = tf.keras.metrics.AUC(curve="ROC", name="roc_auc")
v7_model.compile(metrics=[pr_auc, roc_auc])
model.load_weights("...")
candidate_model = model
with mirrored_strategy.scope():
candidate_eval = candidate_model.evaluate(val_ds)
test_query = f"""
SELECT
{",".join(ptos_prototype.feature_names())},
has_media,
precision_nsfw,
{",".join(labels)},
...
"""
test = cq_loader.load_features(ptos_prototype, "", "", custom_query=test_query)
test = test.to_tf_dataset().map(parse_labeled_data)
print(test)
test_only_media = test.filter(lambda x, y: tf.equal(x["has_media"], True))
test_only_nsfw = test.filter(lambda x, y: tf.greater_equal(x["precision_nsfw"], 0.95))
test_no_media = test.filter(lambda x, y: tf.equal(x["has_media"], False))
test_media_not_nsfw = test.filter(lambda x, y: tf.logical_and(tf.equal(x["has_media"], True), tf.less(x["precision_nsfw"], 0.95)))
for d in [test, test_only_media, test_only_nsfw, test_no_media, test_media_not_nsfw]:
print(d.reduce(0, lambda x, _: x + 1).numpy())
from notebook_eval_utils import SparseMultilabelEvaluator, EvalConfig
from dataclasses import asdict
def display_metrics(probs, targets, labels=labels):
eval_config = EvalConfig(prediction_threshold=0.5, precision_k=0.9)
for eval_mode, y_mask in [("implicit", np.ones(targets.shape))]:
print("Evaluation mode", eval_mode)
metrics = SparseMultilabelEvaluator.evaluate(
targets, np.array(probs), y_mask, classes=labels, eval_config=eval_config
)
metrics_df = pd.DataFrame.from_dict(asdict(metrics)["per_topic_metrics"]).transpose()
metrics_df["pos_to_neg"] = metrics_df["num_pos_samples"] / (metrics_df["num_neg_samples"] + 1)
display(metrics_df.median())
display(metrics_df)
return metrics_df
def eval_model(model, df):
with mirrored_strategy.scope():
targets = np.stack(list(df.map(lambda x, y: y).as_numpy_iterator()), axis=0)
df = df.padded_batch(BATCH_SIZE)
preds = model.predict(df)
return display_metrics(preds, targets)
subsets = {"test": test,
"test_only_media": test_only_media,
"test_only_nsfw": test_only_nsfw,
"test_no_media": test_no_media,
"test_media_not_nsfw": test_media_not_nsfw}
metrics = {}
for name, df in subsets.items():
metrics[name] = eval_model(candidate_model, df)
[(name, m.pr_auc) for name, m in metrics.items()]
for name, x in [(name, m.pr_auc.to_string(index=False).strip().split("\n")) for name, m in metrics.items()]:
print(name)
for y in x:
print(y.strip(), end="\t")
print(".")
for d in [test, test_only_media, test_only_nsfw, test_no_media, test_media_not_nsfw]:
print(d.reduce(0, lambda x, _: x + 1).numpy())

View File

@ -1,466 +0,0 @@
import kerastuner as kt
import math
import numpy as np
import pandas as pd
import random
import sklearn.metrics
import tensorflow as tf
import os
import glob
from tqdm import tqdm
from matplotlib import pyplot as plt
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from google.cloud import storage
physical_devices = tf.config.list_physical_devices('GPU')
physical_devices
tf.config.set_visible_devices([tf.config.PhysicalDevice(name='/physical_device:GPU:1', device_type='GPU')], 'GPU')
tf.config.get_visible_devices('GPU')
def decode_fn_embedding(example_proto):
feature_description = {
"embedding": tf.io.FixedLenFeature([256], dtype=tf.float32),
"labels": tf.io.FixedLenFeature([], dtype=tf.int64),
}
example = tf.io.parse_single_example(
example_proto,
feature_description
)
return example
def preprocess_embedding_example(example_dict, positive_label=1, features_as_dict=False):
labels = example_dict["labels"]
label = tf.math.reduce_any(labels == positive_label)
label = tf.cast(label, tf.int32)
embedding = example_dict["embedding"]
if features_as_dict:
features = {"embedding": embedding}
else:
features = embedding
return features, label
input_root = ...
sens_prev_input_root = ...
use_sens_prev_data = True
has_validation_data = True
positive_label = 1
train_batch_size = 256
test_batch_size = 256
validation_batch_size = 256
do_resample = False
def class_func(features, label):
return label
resample_fn = tf.data.experimental.rejection_resample(
class_func, target_dist = [0.5, 0.5], seed=0
)
train_glob = f"{input_root}/train/tfrecord/*.tfrecord"
train_files = tf.io.gfile.glob(train_glob)
if use_sens_prev_data:
train_sens_prev_glob = f"{sens_prev_input_root}/train/tfrecord/*.tfrecord"
train_sens_prev_files = tf.io.gfile.glob(train_sens_prev_glob)
train_files = train_files + train_sens_prev_files
random.shuffle(train_files)
if not len(train_files):
raise ValueError(f"Did not find any train files matching {train_glob}")
test_glob = f"{input_root}/test/tfrecord/*.tfrecord"
test_files = tf.io.gfile.glob(test_glob)
if not len(test_files):
raise ValueError(f"Did not find any eval files matching {test_glob}")
test_ds = tf.data.TFRecordDataset(test_files).map(decode_fn_embedding)
test_ds = test_ds.map(lambda x: preprocess_embedding_example(x, positive_label=positive_label)).batch(batch_size=test_batch_size)
if use_sens_prev_data:
test_sens_prev_glob = f"{sens_prev_input_root}/test/tfrecord/*.tfrecord"
test_sens_prev_files = tf.io.gfile.glob(test_sens_prev_glob)
if not len(test_sens_prev_files):
raise ValueError(f"Did not find any eval files matching {test_sens_prev_glob}")
test_sens_prev_ds = tf.data.TFRecordDataset(test_sens_prev_files).map(decode_fn_embedding)
test_sens_prev_ds = test_sens_prev_ds.map(lambda x: preprocess_embedding_example(x, positive_label=positive_label)).batch(batch_size=test_batch_size)
train_ds = tf.data.TFRecordDataset(train_files).map(decode_fn_embedding)
train_ds = train_ds.map(lambda x: preprocess_embedding_example(x, positive_label=positive_label))
if do_resample:
train_ds = train_ds.apply(resample_fn).map(lambda _,b:(b))
train_ds = train_ds.batch(batch_size=256).shuffle(buffer_size=10)
train_ds = train_ds.repeat()
if has_validation_data:
eval_glob = f"{input_root}/validation/tfrecord/*.tfrecord"
eval_files = tf.io.gfile.glob(eval_glob)
if use_sens_prev_data:
eval_sens_prev_glob = f"{sens_prev_input_root}/validation/tfrecord/*.tfrecord"
eval_sens_prev_files = tf.io.gfile.glob(eval_sens_prev_glob)
eval_files = eval_files + eval_sens_prev_files
if not len(eval_files):
raise ValueError(f"Did not find any eval files matching {eval_glob}")
eval_ds = tf.data.TFRecordDataset(eval_files).map(decode_fn_embedding)
eval_ds = eval_ds.map(lambda x: preprocess_embedding_example(x, positive_label=positive_label)).batch(batch_size=validation_batch_size)
else:
eval_ds = tf.data.TFRecordDataset(test_files).map(decode_fn_embedding)
eval_ds = eval_ds.map(lambda x: preprocess_embedding_example(x, positive_label=positive_label)).batch(batch_size=validation_batch_size)
check_ds = tf.data.TFRecordDataset(train_files).map(decode_fn_embedding)
cnt = 0
pos_cnt = 0
for example in tqdm(check_ds):
label = example['labels']
if label == 1:
pos_cnt += 1
cnt += 1
print(f'{cnt} train entries with {pos_cnt} positive')
metrics = []
metrics.append(
tf.keras.metrics.PrecisionAtRecall(
recall=0.9, num_thresholds=200, class_id=None, name=None, dtype=None
)
)
metrics.append(
tf.keras.metrics.AUC(
num_thresholds=200,
curve="PR",
)
)
def build_model(hp):
model = Sequential()
optimizer = tf.keras.optimizers.Adam(
learning_rate=0.001,
beta_1=0.9,
beta_2=0.999,
epsilon=1e-08,
amsgrad=False,
name="Adam",
)
activation=hp.Choice("activation", ["tanh", "gelu"])
kernel_initializer=hp.Choice("kernel_initializer", ["he_uniform", "glorot_uniform"])
for i in range(hp.Int("num_layers", 1, 2)):
model.add(tf.keras.layers.BatchNormalization())
units=hp.Int("units", min_value=128, max_value=256, step=128)
if i == 0:
model.add(
Dense(
units=units,
activation=activation,
kernel_initializer=kernel_initializer,
input_shape=(None, 256)
)
)
else:
model.add(
Dense(
units=units,
activation=activation,
kernel_initializer=kernel_initializer,
)
)
model.add(Dense(1, activation='sigmoid', kernel_initializer=kernel_initializer))
model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=metrics)
return model
tuner = kt.tuners.BayesianOptimization(
build_model,
objective=kt.Objective('val_loss', direction="min"),
max_trials=30,
directory='tuner_dir',
project_name='with_twitter_clip')
callbacks = [tf.keras.callbacks.EarlyStopping(
monitor='val_loss', min_delta=0, patience=5, verbose=0,
mode='auto', baseline=None, restore_best_weights=True
)]
steps_per_epoch = 400
tuner.search(train_ds,
epochs=100,
batch_size=256,
steps_per_epoch=steps_per_epoch,
verbose=2,
validation_data=eval_ds,
callbacks=callbacks)
tuner.results_summary()
models = tuner.get_best_models(num_models=2)
best_model = models[0]
best_model.build(input_shape=(None, 256))
best_model.summary()
tuner.get_best_hyperparameters()[0].values
optimizer = tf.keras.optimizers.Adam(
learning_rate=0.001,
beta_1=0.9,
beta_2=0.999,
epsilon=1e-08,
amsgrad=False,
name="Adam",
)
best_model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=metrics)
best_model.summary()
callbacks = [tf.keras.callbacks.EarlyStopping(
monitor='val_loss', min_delta=0, patience=10, verbose=0,
mode='auto', baseline=None, restore_best_weights=True
)]
history = best_model.fit(train_ds, epochs=100, validation_data=eval_ds, steps_per_epoch=steps_per_epoch, callbacks=callbacks)
model_name = 'twitter_hypertuned'
model_path = f'models/nsfw_Keras_with_CLIP_{model_name}'
tf.keras.models.save_model(best_model, model_path)
def copy_local_directory_to_gcs(local_path, bucket, gcs_path):
"""Recursively copy a directory of files to GCS.
local_path should be a directory and not have a trailing slash.
"""
assert os.path.isdir(local_path)
for local_file in glob.glob(local_path + '/**'):
if not os.path.isfile(local_file):
dir_name = os.path.basename(os.path.normpath(local_file))
copy_local_directory_to_gcs(local_file, bucket, f"{gcs_path}/{dir_name}")
else:
remote_path = os.path.join(gcs_path, local_file[1 + len(local_path) :])
blob = bucket.blob(remote_path)
blob.upload_from_filename(local_file)
client = storage.Client(project=...)
bucket = client.get_bucket(...)
copy_local_directory_to_gcs(model_path, bucket, model_path)
copy_local_directory_to_gcs('tuner_dir', bucket, 'tuner_dir')
loaded_model = tf.keras.models.load_model(model_path)
print(history.history.keys())
plt.figure(figsize = (20, 5))
plt.subplot(1, 3, 1)
plt.plot(history.history['auc'])
plt.plot(history.history['val_auc'])
plt.title('model auc')
plt.ylabel('auc')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.subplot(1, 3, 2)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.subplot(1, 3, 3)
plt.plot(history.history['precision_at_recall'])
plt.plot(history.history['val_precision_at_recall'])
plt.title('model precision at 0.9 recall')
plt.ylabel('precision_at_recall')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.savefig('history_with_twitter_clip.pdf')
test_labels = []
test_preds = []
for batch_features, batch_labels in tqdm(test_ds):
test_preds.extend(loaded_model.predict_proba(batch_features))
test_labels.extend(batch_labels.numpy())
test_sens_prev_labels = []
test_sens_prev_preds = []
for batch_features, batch_labels in tqdm(test_sens_prev_ds):
test_sens_prev_preds.extend(loaded_model.predict_proba(batch_features))
test_sens_prev_labels.extend(batch_labels.numpy())
n_test_pos = 0
n_test_neg = 0
n_test = 0
for label in test_labels:
n_test +=1
if label == 1:
n_test_pos +=1
else:
n_test_neg +=1
print(f'n_test = {n_test}, n_pos = {n_test_pos}, n_neg = {n_test_neg}')
n_test_sens_prev_pos = 0
n_test_sens_prev_neg = 0
n_test_sens_prev = 0
for label in test_sens_prev_labels:
n_test_sens_prev +=1
if label == 1:
n_test_sens_prev_pos +=1
else:
n_test_sens_prev_neg +=1
print(f'n_test_sens_prev = {n_test_sens_prev}, n_pos_sens_prev = {n_test_sens_prev_pos}, n_neg = {n_test_sens_prev_neg}')
test_weights = np.ones(np.asarray(test_preds).shape)
test_labels = np.asarray(test_labels)
test_preds = np.asarray(test_preds)
test_weights = np.asarray(test_weights)
pr = sklearn.metrics.precision_recall_curve(
test_labels,
test_preds)
auc = sklearn.metrics.auc(pr[1], pr[0])
plt.plot(pr[1], pr[0])
plt.title("nsfw (MU test set)")
test_sens_prev_weights = np.ones(np.asarray(test_sens_prev_preds).shape)
test_sens_prev_labels = np.asarray(test_sens_prev_labels)
test_sens_prev_preds = np.asarray(test_sens_prev_preds)
test_sens_prev_weights = np.asarray(test_sens_prev_weights)
pr_sens_prev = sklearn.metrics.precision_recall_curve(
test_sens_prev_labels,
test_sens_prev_preds)
auc_sens_prev = sklearn.metrics.auc(pr_sens_prev[1], pr_sens_prev[0])
plt.plot(pr_sens_prev[1], pr_sens_prev[0])
plt.title("nsfw (sens prev test set)")
df = pd.DataFrame(
{
"label": test_labels.squeeze(),
"preds_keras": np.asarray(test_preds).flatten(),
})
plt.figure(figsize=(15, 10))
df["preds_keras"].hist()
plt.title("Keras predictions", size=20)
plt.xlabel('score')
plt.ylabel("freq")
plt.figure(figsize = (20, 5))
plt.subplot(1, 3, 1)
plt.plot(pr[2], pr[0][0:-1])
plt.xlabel("threshold")
plt.ylabel("precision")
plt.subplot(1, 3, 2)
plt.plot(pr[2], pr[1][0:-1])
plt.xlabel("threshold")
plt.ylabel("recall")
plt.title("Keras", size=20)
plt.subplot(1, 3, 3)
plt.plot(pr[1], pr[0])
plt.xlabel("recall")
plt.ylabel("precision")
plt.savefig('with_twitter_clip.pdf')
def get_point_for_recall(recall_value, recall, precision):
idx = np.argmin(np.abs(recall - recall_value))
return (recall[idx], precision[idx])
def get_point_for_precision(precision_value, recall, precision):
idx = np.argmin(np.abs(precision - precision_value))
return (recall[idx], precision[idx])
precision, recall, thresholds = pr
auc_precision_recall = sklearn.metrics.auc(recall, precision)
print(auc_precision_recall)
plt.figure(figsize=(15, 10))
plt.plot(recall, precision)
plt.xlabel("recall")
plt.ylabel("precision")
ptAt50 = get_point_for_recall(0.5, recall, precision)
print(ptAt50)
plt.plot( [ptAt50[0],ptAt50[0]], [0,ptAt50[1]], 'r')
plt.plot([0, ptAt50[0]], [ptAt50[1], ptAt50[1]], 'r')
ptAt90 = get_point_for_recall(0.9, recall, precision)
print(ptAt90)
plt.plot( [ptAt90[0],ptAt90[0]], [0,ptAt90[1]], 'b')
plt.plot([0, ptAt90[0]], [ptAt90[1], ptAt90[1]], 'b')
ptAt50fmt = "%.4f" % ptAt50[1]
ptAt90fmt = "%.4f" % ptAt90[1]
aucFmt = "%.4f" % auc_precision_recall
plt.title(
f"Keras (nsfw MU test)\nAUC={aucFmt}\np={ptAt50fmt} @ r=0.5\np={ptAt90fmt} @ r=0.9\nN_train={...}} ({...} pos), N_test={n_test} ({n_test_pos} pos)",
size=20
)
plt.subplots_adjust(top=0.72)
plt.savefig('recall_precision_nsfw_Keras_with_twitter_CLIP_MU_test.pdf')
precision, recall, thresholds = pr_sens_prev
auc_precision_recall = sklearn.metrics.auc(recall, precision)
print(auc_precision_recall)
plt.figure(figsize=(15, 10))
plt.plot(recall, precision)
plt.xlabel("recall")
plt.ylabel("precision")
ptAt50 = get_point_for_recall(0.5, recall, precision)
print(ptAt50)
plt.plot( [ptAt50[0],ptAt50[0]], [0,ptAt50[1]], 'r')
plt.plot([0, ptAt50[0]], [ptAt50[1], ptAt50[1]], 'r')
ptAt90 = get_point_for_recall(0.9, recall, precision)
print(ptAt90)
plt.plot( [ptAt90[0],ptAt90[0]], [0,ptAt90[1]], 'b')
plt.plot([0, ptAt90[0]], [ptAt90[1], ptAt90[1]], 'b')
ptAt50fmt = "%.4f" % ptAt50[1]
ptAt90fmt = "%.4f" % ptAt90[1]
aucFmt = "%.4f" % auc_precision_recall
plt.title(
f"Keras (nsfw sens prev test)\nAUC={aucFmt}\np={ptAt50fmt} @ r=0.5\np={ptAt90fmt} @ r=0.9\nN_train={...} ({...} pos), N_test={n_test_sens_prev} ({n_test_sens_prev_pos} pos)",
size=20
)
plt.subplots_adjust(top=0.72)
plt.savefig('recall_precision_nsfw_Keras_with_twitter_CLIP_sens_prev_test.pdf')

View File

@ -1,152 +0,0 @@
from datetime import datetime
from functools import reduce
import os
import pandas as pd
import re
from sklearn.metrics import average_precision_score, classification_report, precision_recall_curve, PrecisionRecallDisplay
from sklearn.model_selection import train_test_split
import tensorflow as tf
import matplotlib.pyplot as plt
import re
from twitter.cuad.representation.models.optimization import create_optimizer
from twitter.cuad.representation.models.text_encoder import TextEncoder
pd.set_option('display.max_colwidth', None)
pd.set_option('display.expand_frame_repr', False)
print(tf.__version__)
print(tf.config.list_physical_devices())
log_path = os.path.join('pnsfwtweettext_model_runs', datetime.now().strftime('%Y-%m-%d_%H.%M.%S'))
tweet_text_feature = 'text'
params = {
'batch_size': 32,
'max_seq_lengths': 256,
'model_type': 'twitter_bert_base_en_uncased_augmented_mlm',
'trainable_text_encoder': True,
'lr': 5e-5,
'epochs': 10,
}
REGEX_PATTERNS = [
r'^RT @[A-Za-z0-9_]+: ',
r"@[A-Za-z0-9_]+",
r'https:\/\/t\.co\/[A-Za-z0-9]{10}',
r'@\?\?\?\?\?',
]
EMOJI_PATTERN = re.compile(
"(["
"\U0001F1E0-\U0001F1FF"
"\U0001F300-\U0001F5FF"
"\U0001F600-\U0001F64F"
"\U0001F680-\U0001F6FF"
"\U0001F700-\U0001F77F"
"\U0001F780-\U0001F7FF"
"\U0001F800-\U0001F8FF"
"\U0001F900-\U0001F9FF"
"\U0001FA00-\U0001FA6F"
"\U0001FA70-\U0001FAFF"
"\U00002702-\U000027B0"
"])"
)
def clean_tweet(text):
for pattern in REGEX_PATTERNS:
text = re.sub(pattern, '', text)
text = re.sub(EMOJI_PATTERN, r' \1 ', text)
text = re.sub(r'\n', ' ', text)
return text.strip().lower()
df['processed_text'] = df['text'].astype(str).map(clean_tweet)
df.sample(10)
X_train, X_val, y_train, y_val = train_test_split(df[['processed_text']], df['is_nsfw'], test_size=0.1, random_state=1)
def df_to_ds(X, y, shuffle=False):
ds = tf.data.Dataset.from_tensor_slices((
X.values,
tf.one_hot(tf.cast(y.values, tf.int32), depth=2, axis=-1)
))
if shuffle:
ds = ds.shuffle(1000, seed=1, reshuffle_each_iteration=True)
return ds.map(lambda text, label: ({ tweet_text_feature: text }, label)).batch(params['batch_size'])
ds_train = df_to_ds(X_train, y_train, shuffle=True)
ds_val = df_to_ds(X_val, y_val)
X_train.values
inputs = tf.keras.layers.Input(shape=(), dtype=tf.string, name=tweet_text_feature)
encoder = TextEncoder(
max_seq_lengths=params['max_seq_lengths'],
model_type=params['model_type'],
trainable=params['trainable_text_encoder'],
local_preprocessor_path='demo-preprocessor'
)
embedding = encoder([inputs])["pooled_output"]
predictions = tf.keras.layers.Dense(2, activation='softmax')(embedding)
model = tf.keras.models.Model(inputs=inputs, outputs=predictions)
model.summary()
optimizer = create_optimizer(
params['lr'],
params['epochs'] * len(ds_train),
0,
weight_decay_rate=0.01,
optimizer_type='adamw'
)
bce = tf.keras.losses.BinaryCrossentropy(from_logits=False)
pr_auc = tf.keras.metrics.AUC(curve='PR', num_thresholds=1000, from_logits=False)
model.compile(optimizer=optimizer, loss=bce, metrics=[pr_auc])
callbacks = [
tf.keras.callbacks.EarlyStopping(
monitor='val_loss',
mode='min',
patience=1,
restore_best_weights=True
),
tf.keras.callbacks.ModelCheckpoint(
filepath=os.path.join(log_path, 'checkpoints', '{epoch:02d}'),
save_freq='epoch'
),
tf.keras.callbacks.TensorBoard(
log_dir=os.path.join(log_path, 'scalars'),
update_freq='batch',
write_graph=False
)
]
history = model.fit(
ds_train,
epochs=params['epochs'],
callbacks=callbacks,
validation_data=ds_val,
steps_per_epoch=len(ds_train)
)
model.predict(["xxx 🍑"])
preds = X_val.processed_text.apply(apply_model)
print(classification_report(y_val, preds >= 0.90, digits=4))
precision, recall, thresholds = precision_recall_curve(y_val, preds)
fig = plt.figure(figsize=(15, 10))
plt.plot(precision, recall, lw=2)
plt.grid()
plt.xlim(0.2, 1)
plt.ylim(0.3, 1)
plt.xlabel("Recall", size=20)
plt.ylabel("Precision", size=20)
average_precision_score(y_val, preds)

View File

@ -1,118 +0,0 @@
from abc import ABC
import re
from toxicity_ml_pipeline.settings.hcomp_settings import TOXIC_35
import numpy as np
TOXIC_35_set = set(TOXIC_35)
url_group = r"(\bhttps?:\/\/\S+)"
mention_group = r"(\B@\S+)"
urls_mentions_re = re.compile(url_group + r"|" + mention_group, re.IGNORECASE)
url_re = re.compile(url_group, re.IGNORECASE)
mention_re = re.compile(mention_group, re.IGNORECASE)
newline_re = re.compile(r"\n+", re.IGNORECASE)
and_re = re.compile(r"&\s?amp\s?;", re.IGNORECASE)
class DataframeCleaner(ABC):
def __init__(self):
pass
def _clean(self, df):
return df
def _systematic_preprocessing(self, df):
df.reset_index(inplace=True, drop=True)
if "media_url" in df.columns:
print(".... removing tweets with media")
df.drop(df[~df.media_url.isna()].index, inplace=True, axis=0)
else:
print("WARNING you are not removing tweets with media to train a BERT model.")
print(".... deleting duplicates")
df.drop_duplicates("text", inplace=True, keep="last")
print(f"Got {df.shape[0]} after cleaning")
return df.reset_index(inplace=False, drop=True)
def _postprocess(self, df, *args, **kwargs):
return df
def __call__(self, df, *args, **kwargs):
print(f"Got {df.shape[0]} before cleaning")
df["raw_text"] = df.text
df = self._clean(df)
df = self._systematic_preprocessing(df)
return self._postprocess(df, *args, **kwargs)
def mapping_func(el):
if el.aggregated_content in TOXIC_35_set:
return 2
if el.label == 1:
return 1
return 0
class DefaultENNoPreprocessor(DataframeCleaner):
def _postprocess(self, df, *args, **kwargs):
if "toxic_count" in df.columns and "non_toxic_count" in df.columns:
df["vote"] = df.toxic_count / (df.toxic_count + df.non_toxic_count)
df["agreement_rate"] = np.max((df.vote, 1 - df.vote), axis=0)
if "label_column" in kwargs and kwargs["label_column"] != "label":
if kwargs["label_column"] == "aggregated_content":
print("Replacing v3 label by v3.5 label.")
if "num_classes" in kwargs and kwargs["num_classes"] < 3:
df["label"] = np.where(df.aggregated_content.isin(TOXIC_35_set), 1, 0)
elif "num_classes" in kwargs and kwargs["num_classes"] == 3:
print("Making it a 3-class pb")
df["label"] = df.apply(mapping_func, axis=1)
else:
raise NotImplementedError
elif kwargs['label_column'] in df.columns:
df['label'] = df[kwargs['label_column']]
if kwargs['class_weight'] is not None:
df["class_weight"] = np.where(df['label'] == 1, 1-kwargs['class_weight'],
kwargs['class_weight'])
else:
raise NotImplementedError
if "filter_low_agreements" in kwargs and kwargs["filter_low_agreements"] == True:
df.drop(df[(df.agreement_rate <= 0.6)].index, axis=0, inplace=True)
raise NotImplementedError
return df
class DefaultENPreprocessor(DefaultENNoPreprocessor):
def _clean(self, adhoc_df):
print(
".... removing \\n and replacing @mentions and URLs by placeholders. "
"Emoji filtering is not done."
)
adhoc_df["text"] = [url_re.sub("URL", tweet) for tweet in adhoc_df.raw_text.values]
adhoc_df["text"] = [mention_re.sub("MENTION", tweet) for tweet in adhoc_df.text.values]
adhoc_df["text"] = [
newline_re.sub(" ", tweet).lstrip(" ").rstrip(" ") for tweet in adhoc_df.text.values
]
adhoc_df["text"] = [and_re.sub("&", tweet) for tweet in adhoc_df.text.values]
return adhoc_df
class Defaulti18nPreprocessor(DataframeCleaner):
def _clean(self, adhoc_df):
print(".... removing @mentions, \\n and URLs. Emoji filtering is not done.")
adhoc_df["text"] = [urls_mentions_re.sub("", tweet) for tweet in adhoc_df.raw_text.values]
adhoc_df["text"] = [
newline_re.sub(" ", tweet).lstrip(" ").rstrip(" ") for tweet in adhoc_df.text.values
]
return adhoc_df

View File

@ -1,348 +0,0 @@
from abc import ABC, abstractmethod
from datetime import date
from importlib import import_module
import pickle
from toxicity_ml_pipeline.settings.default_settings_tox import (
CLIENT,
EXISTING_TASK_VERSIONS,
GCS_ADDRESS,
TRAINING_DATA_LOCATION,
)
from toxicity_ml_pipeline.utils.helpers import execute_command, execute_query
from toxicity_ml_pipeline.utils.queries import (
FULL_QUERY,
FULL_QUERY_W_TWEET_TYPES,
PARSER_UDF,
QUERY_SETTINGS,
)
import numpy as np
import pandas
class DataframeLoader(ABC):
def __init__(self, project):
self.project = project
@abstractmethod
def produce_query(self):
pass
@abstractmethod
def load_data(self, test=False):
pass
class ENLoader(DataframeLoader):
def __init__(self, project, setting_file):
super(ENLoader, self).__init__(project=project)
self.date_begin = setting_file.DATE_BEGIN
self.date_end = setting_file.DATE_END
TASK_VERSION = setting_file.TASK_VERSION
if TASK_VERSION not in EXISTING_TASK_VERSIONS:
raise ValueError
self.task_version = TASK_VERSION
self.query_settings = dict(QUERY_SETTINGS)
self.full_query = FULL_QUERY
def produce_query(self, date_begin, date_end, task_version=None, **keys):
task_version = self.task_version if task_version is None else task_version
if task_version in keys["table"]:
table_name = keys["table"][task_version]
print(f"Loading {table_name}")
main_query = keys["main"].format(
table=table_name,
parser_udf=PARSER_UDF[task_version],
date_begin=date_begin,
date_end=date_end,
)
return self.full_query.format(
main_table_query=main_query, date_begin=date_begin, date_end=date_end
)
return ""
def _reload(self, test, file_keyword):
query = f"SELECT * from `{TRAINING_DATA_LOCATION.format(project=self.project)}_{file_keyword}`"
if test:
query += " ORDER BY RAND() LIMIT 1000"
try:
df = execute_query(client=CLIENT, query=query)
except Exception:
print(
"Loading from BQ failed, trying to load from GCS. "
"NB: use this option only for intermediate files, which will be deleted at the end of "
"the project."
)
copy_cmd = f"gsutil cp {GCS_ADDRESS.format(project=self.project)}/training_data/{file_keyword}.pkl ."
execute_command(copy_cmd)
try:
with open(f"{file_keyword}.pkl", "rb") as file:
df = pickle.load(file)
except Exception:
return None
if test:
df = df.sample(frac=1)
return df.iloc[:1000]
return df
def load_data(self, test=False, **kwargs):
if "reload" in kwargs and kwargs["reload"]:
df = self._reload(test, kwargs["reload"])
if df is not None and df.shape[0] > 0:
return df
df = None
query_settings = self.query_settings
if test:
query_settings = {"fairness": self.query_settings["fairness"]}
query_settings["fairness"]["main"] += " LIMIT 500"
for table, query_info in query_settings.items():
curr_query = self.produce_query(
date_begin=self.date_begin, date_end=self.date_end, **query_info
)
if curr_query == "":
continue
curr_df = execute_query(client=CLIENT, query=curr_query)
curr_df["origin"] = table
df = curr_df if df is None else pandas.concat((df, curr_df))
df["loading_date"] = date.today()
df["date"] = pandas.to_datetime(df.date)
return df
def load_precision_set(
self, begin_date="...", end_date="...", with_tweet_types=False, task_version=3.5
):
if with_tweet_types:
self.full_query = FULL_QUERY_W_TWEET_TYPES
query_settings = self.query_settings
curr_query = self.produce_query(
date_begin=begin_date,
date_end=end_date,
task_version=task_version,
**query_settings["precision"],
)
curr_df = execute_query(client=CLIENT, query=curr_query)
curr_df.rename(columns={"media_url": "media_presence"}, inplace=True)
return curr_df
class ENLoaderWithSampling(ENLoader):
keywords = {
"politics": [
...
],
"insults": [
...
],
"race": [
...
],
}
n = ...
N = ...
def __init__(self, project):
self.raw_loader = ENLoader(project=project)
if project == ...:
self.project = project
else:
raise ValueError
def sample_with_weights(self, df, n):
w = df["label"].value_counts(normalize=True)[1]
dist = np.full((df.shape[0],), w)
sampled_df = df.sample(n=n, weights=dist, replace=False)
return sampled_df
def sample_keywords(self, df, N, group):
print("\nmatching", group, "keywords...")
keyword_list = self.keywords[group]
match_df = df.loc[df.text.str.lower().str.contains("|".join(keyword_list), regex=True)]
print("sampling N/3 from", group)
if match_df.shape[0] <= N / 3:
print(
"WARNING: Sampling only",
match_df.shape[0],
"instead of",
N / 3,
"examples from race focused tweets due to insufficient data",
)
sample_df = match_df
else:
print(
"sampling",
group,
"at",
round(match_df["label"].value_counts(normalize=True)[1], 3),
"% action rate",
)
sample_df = self.sample_with_weights(match_df, int(N / 3))
print(sample_df.shape)
print(sample_df.label.value_counts(normalize=True))
print("\nshape of df before dropping sampled rows after", group, "matching..", df.shape[0])
df = df.loc[
df.index.difference(sample_df.index),
]
print("\nshape of df after dropping sampled rows after", group, "matching..", df.shape[0])
return df, sample_df
def sample_first_set_helper(self, train_df, first_set, new_n):
if first_set == "prev":
fset = train_df.loc[train_df["origin"].isin(["prevalence", "causal prevalence"])]
print(
"sampling prev at", round(fset["label"].value_counts(normalize=True)[1], 3), "% action rate"
)
else:
fset = train_df
n_fset = self.sample_with_weights(fset, new_n)
print("len of sampled first set", n_fset.shape[0])
print(n_fset.label.value_counts(normalize=True))
return n_fset
def sample(self, df, first_set, second_set, keyword_sampling, n, N):
train_df = df[df.origin != "precision"]
val_test_df = df[df.origin == "precision"]
print("\nsampling first set of data")
new_n = n - N if second_set is not None else n
n_fset = self.sample_first_set_helper(train_df, first_set, new_n)
print("\nsampling second set of data")
train_df = train_df.loc[
train_df.index.difference(n_fset.index),
]
if second_set is None:
print("no second set sampling being done")
df = n_fset.append(val_test_df)
return df
if second_set == "prev":
sset = train_df.loc[train_df["origin"].isin(["prevalence", "causal prevalence"])]
elif second_set == "fdr":
sset = train_df.loc[train_df["origin"] == "fdr"]
else:
sset = train_df
if keyword_sampling == True:
print("sampling based off of keywords defined...")
print("second set is", second_set, "with length", sset.shape[0])
sset, n_politics = self.sample_keywords(sset, N, "politics")
sset, n_insults = self.sample_keywords(sset, N, "insults")
sset, n_race = self.sample_keywords(sset, N, "race")
n_sset = n_politics.append([n_insults, n_race])
print("len of sampled second set", n_sset.shape[0])
else:
print(
"No keyword sampling. Instead random sampling from",
second_set,
"at",
round(sset["label"].value_counts(normalize=True)[1], 3),
"% action rate",
)
n_sset = self.sample_with_weights(sset, N)
print("len of sampled second set", n_sset.shape[0])
print(n_sset.label.value_counts(normalize=True))
df = n_fset.append([n_sset, val_test_df])
df = df.sample(frac=1).reset_index(drop=True)
return df
def load_data(
self, first_set="prev", second_set=None, keyword_sampling=False, test=False, **kwargs
):
n = kwargs.get("n", self.n)
N = kwargs.get("N", self.N)
df = self.raw_loader.load_data(test=test, **kwargs)
return self.sample(df, first_set, second_set, keyword_sampling, n, N)
class I18nLoader(DataframeLoader):
def __init__(self):
super().__init__(project=...)
from archive.settings.... import ACCEPTED_LANGUAGES, QUERY_SETTINGS
self.accepted_languages = ACCEPTED_LANGUAGES
self.query_settings = dict(QUERY_SETTINGS)
def produce_query(self, language, query, dataset, table, lang):
query = query.format(dataset=dataset, table=table)
add_query = f"AND reviewed.{lang}='{language}'"
query += add_query
return query
def query_keys(self, language, task=2, size="50"):
if task == 2:
if language == "ar":
self.query_settings["adhoc_v2"]["table"] = "..."
elif language == "tr":
self.query_settings["adhoc_v2"]["table"] = "..."
elif language == "es":
self.query_settings["adhoc_v2"]["table"] = f"..."
else:
self.query_settings["adhoc_v2"]["table"] = "..."
return self.query_settings["adhoc_v2"]
if task == 3:
return self.query_settings["adhoc_v3"]
raise ValueError(f"There are no other tasks than 2 or 3. {task} does not exist.")
def load_data(self, language, test=False, task=2):
if language not in self.accepted_languages:
raise ValueError(
f"Language not in the data {language}. Accepted values are " f"{self.accepted_languages}"
)
print(".... adhoc data")
key_dict = self.query_keys(language=language, task=task)
query_adhoc = self.produce_query(language=language, **key_dict)
if test:
query_adhoc += " LIMIT 500"
adhoc_df = execute_query(CLIENT, query_adhoc)
if not (test or language == "tr" or task == 3):
if language == "es":
print(".... additional adhoc data")
key_dict = self.query_keys(language=language, size="100")
query_adhoc = self.produce_query(language=language, **key_dict)
adhoc_df = pandas.concat(
(adhoc_df, execute_query(CLIENT, query_adhoc)), axis=0, ignore_index=True
)
print(".... prevalence data")
query_prev = self.produce_query(language=language, **self.query_settings["prevalence_v2"])
prev_df = execute_query(CLIENT, query_prev)
prev_df["description"] = "Prevalence"
adhoc_df = pandas.concat((adhoc_df, prev_df), axis=0, ignore_index=True)
return self.clean(adhoc_df)

View File

@ -1,284 +0,0 @@
from importlib import import_module
import os
from toxicity_ml_pipeline.settings.default_settings_tox import (
INNER_CV,
LOCAL_DIR,
MAX_SEQ_LENGTH,
NUM_PREFETCH,
NUM_WORKERS,
OUTER_CV,
TARGET_POS_PER_EPOCH,
)
from toxicity_ml_pipeline.utils.helpers import execute_command
import numpy as np
import pandas
from sklearn.model_selection import StratifiedKFold
import tensorflow as tf
try:
from transformers import AutoTokenizer, DataCollatorWithPadding
except ModuleNotFoundError:
print("...")
else:
from datasets import Dataset
class BalancedMiniBatchLoader(object):
def __init__(
self,
fold,
mb_size,
seed,
perc_training_tox,
scope="TOX",
project=...,
dual_head=None,
n_outer_splits=None,
n_inner_splits=None,
sample_weights=None,
huggingface=False,
):
if 0 >= perc_training_tox or perc_training_tox > 0.5:
raise ValueError("Perc_training_tox should be in ]0; 0.5]")
self.perc_training_tox = perc_training_tox
if not n_outer_splits:
n_outer_splits = OUTER_CV
if isinstance(n_outer_splits, int):
self.n_outer_splits = n_outer_splits
self.get_outer_fold = self._get_outer_cv_fold
if fold < 0 or fold >= self.n_outer_splits or int(fold) != fold:
raise ValueError(f"Number of fold should be an integer in [0 ; {self.n_outer_splits} [.")
elif n_outer_splits == "time":
self.get_outer_fold = self._get_time_fold
if fold != "time":
raise ValueError(
"To avoid repeating the same run many times, the external fold"
"should be time when test data is split according to dates."
)
try:
setting_file = import_module(f"toxicity_ml_pipeline.settings.{scope.lower()}{project}_settings")
except ModuleNotFoundError:
raise ValueError(f"You need to define a setting file for your project {project}.")
self.test_begin_date = setting_file.TEST_BEGIN_DATE
self.test_end_date = setting_file.TEST_END_DATE
else:
raise ValueError(
f"Argument n_outer_splits should either an integer or 'time'. Provided: {n_outer_splits}"
)
self.n_inner_splits = n_inner_splits if n_inner_splits is not None else INNER_CV
self.seed = seed
self.mb_size = mb_size
self.fold = fold
self.sample_weights = sample_weights
self.dual_head = dual_head
self.huggingface = huggingface
if self.huggingface:
self._load_tokenizer()
def _load_tokenizer(self):
print("Making a local copy of Bertweet-base model")
local_model_dir = os.path.join(LOCAL_DIR, "models")
cmd = f"mkdir {local_model_dir} ; gsutil -m cp -r gs://... {local_model_dir}"
execute_command(cmd)
self.tokenizer = AutoTokenizer.from_pretrained(
os.path.join(local_model_dir, "bertweet-base"), normalization=True
)
def tokenize_function(self, el):
return self.tokenizer(
el["text"],
max_length=MAX_SEQ_LENGTH,
padding="max_length",
truncation=True,
add_special_tokens=True,
return_token_type_ids=False,
return_attention_mask=False,
)
def _get_stratified_kfold(self, n_splits):
return StratifiedKFold(shuffle=True, n_splits=n_splits, random_state=self.seed)
def _get_time_fold(self, df):
test_begin_date = pandas.to_datetime(self.test_begin_date).date()
test_end_date = pandas.to_datetime(self.test_end_date).date()
print(f"Test is going from {test_begin_date} to {test_end_date}.")
test_data = df.query("@test_begin_date <= date <= @test_end_date")
query = "date < @test_begin_date"
other_set = df.query(query)
return other_set, test_data
def _get_outer_cv_fold(self, df):
labels = df.int_label
stratifier = self._get_stratified_kfold(n_splits=self.n_outer_splits)
k = 0
for train_index, test_index in stratifier.split(np.zeros(len(labels)), labels):
if k == self.fold:
break
k += 1
train_data = df.iloc[train_index].copy()
test_data = df.iloc[test_index].copy()
return train_data, test_data
def get_steps_per_epoch(self, nb_pos_examples):
return int(max(TARGET_POS_PER_EPOCH, nb_pos_examples) / self.mb_size / self.perc_training_tox)
def make_huggingface_tensorflow_ds(self, group, mb_size=None, shuffle=True):
huggingface_ds = Dataset.from_pandas(group).map(self.tokenize_function, batched=True)
data_collator = DataCollatorWithPadding(tokenizer=self.tokenizer, return_tensors="tf")
tensorflow_ds = huggingface_ds.to_tf_dataset(
columns=["input_ids"],
label_cols=["labels"],
shuffle=shuffle,
batch_size=self.mb_size if mb_size is None else mb_size,
collate_fn=data_collator,
)
if shuffle:
return tensorflow_ds.repeat()
return tensorflow_ds
def make_pure_tensorflow_ds(self, df, nb_samples):
buffer_size = nb_samples * 2
if self.sample_weights is not None:
if self.sample_weights not in df.columns:
raise ValueError
ds = tf.data.Dataset.from_tensor_slices(
(df.text.values, df.label.values, df[self.sample_weights].values)
)
elif self.dual_head:
label_d = {f'{e}_output': df[f'{e}_label'].values for e in self.dual_head}
label_d['content_output'] = tf.keras.utils.to_categorical(label_d['content_output'], num_classes=3)
ds = tf.data.Dataset.from_tensor_slices((df.text.values, label_d))
else:
ds = tf.data.Dataset.from_tensor_slices((df.text.values, df.label.values))
ds = ds.shuffle(buffer_size, seed=self.seed, reshuffle_each_iteration=True).repeat()
return ds
def get_balanced_dataset(self, training_data, size_limit=None, return_as_batch=True):
training_data = training_data.sample(frac=1, random_state=self.seed)
nb_samples = training_data.shape[0] if not size_limit else size_limit
num_classes = training_data.int_label.nunique()
toxic_class = training_data.int_label.max()
if size_limit:
training_data = training_data[: size_limit * num_classes]
print(
".... {} examples, incl. {:.2f}% tox in train, {} classes".format(
nb_samples,
100 * training_data[training_data.int_label == toxic_class].shape[0] / nb_samples,
num_classes,
)
)
label_groups = training_data.groupby("int_label")
if self.huggingface:
label_datasets = {
label: self.make_huggingface_tensorflow_ds(group) for label, group in label_groups
}
else:
label_datasets = {
label: self.make_pure_tensorflow_ds(group, nb_samples=nb_samples * 2)
for label, group in label_groups
}
datasets = [label_datasets[0], label_datasets[1]]
weights = [1 - self.perc_training_tox, self.perc_training_tox]
if num_classes == 3:
datasets.append(label_datasets[2])
weights = [1 - self.perc_training_tox, self.perc_training_tox / 2, self.perc_training_tox / 2]
elif num_classes != 2:
raise ValueError("Currently it should not be possible to get other than 2 or 3 classes")
resampled_ds = tf.data.experimental.sample_from_datasets(datasets, weights, seed=self.seed)
if return_as_batch and not self.huggingface:
return resampled_ds.batch(
self.mb_size, drop_remainder=True, num_parallel_calls=NUM_WORKERS, deterministic=True
).prefetch(NUM_PREFETCH)
return resampled_ds
@staticmethod
def _compute_int_labels(full_df):
if full_df.label.dtype == int:
full_df["int_label"] = full_df.label
elif "int_label" not in full_df.columns:
if full_df.label.max() > 1:
raise ValueError("Binarizing labels that should not be.")
full_df["int_label"] = np.where(full_df.label >= 0.5, 1, 0)
return full_df
def __call__(self, full_df, *args, **kwargs):
full_df = self._compute_int_labels(full_df)
train_data, test_data = self.get_outer_fold(df=full_df)
stratifier = self._get_stratified_kfold(n_splits=self.n_inner_splits)
for train_index, val_index in stratifier.split(
np.zeros(train_data.shape[0]), train_data.int_label
):
curr_train_data = train_data.iloc[train_index]
mini_batches = self.get_balanced_dataset(curr_train_data)
steps_per_epoch = self.get_steps_per_epoch(
nb_pos_examples=curr_train_data[curr_train_data.int_label != 0].shape[0]
)
val_data = train_data.iloc[val_index].copy()
yield mini_batches, steps_per_epoch, val_data, test_data
def simple_cv_load(self, full_df):
full_df = self._compute_int_labels(full_df)
train_data, test_data = self.get_outer_fold(df=full_df)
if test_data.shape[0] == 0:
test_data = train_data.iloc[:500]
mini_batches = self.get_balanced_dataset(train_data)
steps_per_epoch = self.get_steps_per_epoch(
nb_pos_examples=train_data[train_data.int_label != 0].shape[0]
)
return mini_batches, test_data, steps_per_epoch
def no_cv_load(self, full_df):
full_df = self._compute_int_labels(full_df)
val_test = full_df[full_df.origin == "precision"].copy(deep=True)
val_data, test_data = self.get_outer_fold(df=val_test)
train_data = full_df.drop(full_df[full_df.origin == "precision"].index, axis=0)
if test_data.shape[0] == 0:
test_data = train_data.iloc[:500]
mini_batches = self.get_balanced_dataset(train_data)
if train_data.int_label.nunique() == 1:
raise ValueError('Should be at least two labels')
num_examples = train_data[train_data.int_label == 1].shape[0]
if train_data.int_label.nunique() > 2:
second_most_frequent_label = train_data.loc[train_data.int_label != 0, 'int_label'].mode().values[0]
num_examples = train_data[train_data.int_label == second_most_frequent_label].shape[0] * 2
steps_per_epoch = self.get_steps_per_epoch(nb_pos_examples=num_examples)
return mini_batches, steps_per_epoch, val_data, test_data

View File

@ -1,227 +0,0 @@
import os
from toxicity_ml_pipeline.settings.default_settings_tox import LOCAL_DIR, MAX_SEQ_LENGTH
try:
from toxicity_ml_pipeline.optim.losses import MaskedBCE
except ImportError:
print('No MaskedBCE loss')
from toxicity_ml_pipeline.utils.helpers import execute_command
import tensorflow as tf
try:
from twitter.cuad.representation.models.text_encoder import TextEncoder
except ModuleNotFoundError:
print("No TextEncoder package")
try:
from transformers import TFAutoModelForSequenceClassification
except ModuleNotFoundError:
print("No HuggingFace package")
LOCAL_MODEL_DIR = os.path.join(LOCAL_DIR, "models")
def reload_model_weights(weights_dir, language, **kwargs):
optimizer = tf.keras.optimizers.Adam(0.01)
model_type = (
"twitter_bert_base_en_uncased_mlm"
if language == "en"
else "twitter_multilingual_bert_base_cased_mlm"
)
model = load(optimizer=optimizer, seed=42, model_type=model_type, **kwargs)
model.load_weights(weights_dir)
return model
def _locally_copy_models(model_type):
if model_type == "twitter_multilingual_bert_base_cased_mlm":
preprocessor = "bert_multi_cased_preprocess_3"
elif model_type == "twitter_bert_base_en_uncased_mlm":
preprocessor = "bert_en_uncased_preprocess_3"
else:
raise NotImplementedError
copy_cmd = """mkdir {local_dir}
gsutil cp -r ...
gsutil cp -r ..."""
execute_command(
copy_cmd.format(model_type=model_type, preprocessor=preprocessor, local_dir=LOCAL_MODEL_DIR)
)
return preprocessor
def load_encoder(model_type, trainable):
try:
model = TextEncoder(
max_seq_lengths=MAX_SEQ_LENGTH,
model_type=model_type,
cluster="gcp",
trainable=trainable,
enable_dynamic_shapes=True,
)
except (OSError, tf.errors.AbortedError) as e:
print(e)
preprocessor = _locally_copy_models(model_type)
model = TextEncoder(
max_seq_lengths=MAX_SEQ_LENGTH,
local_model_path=f"models/{model_type}",
local_preprocessor_path=f"models/{preprocessor}",
cluster="gcp",
trainable=trainable,
enable_dynamic_shapes=True,
)
return model
def get_loss(loss_name, from_logits, **kwargs):
loss_name = loss_name.lower()
if loss_name == "bce":
print("Binary CE loss")
return tf.keras.losses.BinaryCrossentropy(from_logits=from_logits)
if loss_name == "cce":
print("Categorical cross-entropy loss")
return tf.keras.losses.CategoricalCrossentropy(from_logits=from_logits)
if loss_name == "scce":
print("Sparse categorical cross-entropy loss")
return tf.keras.losses.SparseCategoricalCrossentropy(from_logits=from_logits)
if loss_name == "focal_bce":
gamma = kwargs.get("gamma", 2)
print("Focal binary CE loss", gamma)
return tf.keras.losses.BinaryFocalCrossentropy(gamma=gamma, from_logits=from_logits)
if loss_name == 'masked_bce':
multitask = kwargs.get("multitask", False)
if from_logits or multitask:
raise NotImplementedError
print(f'Masked Binary Cross Entropy')
return MaskedBCE()
if loss_name == "inv_kl_loss":
raise NotImplementedError
raise ValueError(
f"This loss name is not valid: {loss_name}. Accepted loss names: BCE, masked BCE, CCE, sCCE, "
f"Focal_BCE, inv_KL_loss"
)
def _add_additional_embedding_layer(doc_embedding, glorot, seed):
doc_embedding = tf.keras.layers.Dense(768, activation="tanh", kernel_initializer=glorot)(doc_embedding)
doc_embedding = tf.keras.layers.Dropout(rate=0.1, seed=seed)(doc_embedding)
return doc_embedding
def _get_bias(**kwargs):
smart_bias_value = kwargs.get('smart_bias_value', 0)
print('Smart bias init to ', smart_bias_value)
output_bias = tf.keras.initializers.Constant(smart_bias_value)
return output_bias
def load_inhouse_bert(model_type, trainable, seed, **kwargs):
inputs = tf.keras.layers.Input(shape=(), dtype=tf.string)
encoder = load_encoder(model_type=model_type, trainable=trainable)
doc_embedding = encoder([inputs])["pooled_output"]
doc_embedding = tf.keras.layers.Dropout(rate=0.1, seed=seed)(doc_embedding)
glorot = tf.keras.initializers.glorot_uniform(seed=seed)
if kwargs.get("additional_layer", False):
doc_embedding = _add_additional_embedding_layer(doc_embedding, glorot, seed)
if kwargs.get('content_num_classes', None):
probs = get_last_layer(glorot=glorot, last_layer_name='target_output', **kwargs)(doc_embedding)
second_probs = get_last_layer(num_classes=kwargs['content_num_classes'],
last_layer_name='content_output',
glorot=glorot)(doc_embedding)
probs = [probs, second_probs]
else:
probs = get_last_layer(glorot=glorot, **kwargs)(doc_embedding)
model = tf.keras.models.Model(inputs=inputs, outputs=probs)
return model, False
def get_last_layer(**kwargs):
output_bias = _get_bias(**kwargs)
if 'glorot' in kwargs:
glorot = kwargs['glorot']
else:
glorot = tf.keras.initializers.glorot_uniform(seed=kwargs['seed'])
layer_name = kwargs.get('last_layer_name', 'dense_1')
if kwargs.get('num_classes', 1) > 1:
last_layer = tf.keras.layers.Dense(
kwargs["num_classes"], activation="softmax", kernel_initializer=glorot,
bias_initializer=output_bias, name=layer_name
)
elif kwargs.get('num_raters', 1) > 1:
if kwargs.get('multitask', False):
raise NotImplementedError
last_layer = tf.keras.layers.Dense(
kwargs['num_raters'], activation="sigmoid", kernel_initializer=glorot,
bias_initializer=output_bias, name='probs')
else:
last_layer = tf.keras.layers.Dense(
1, activation="sigmoid", kernel_initializer=glorot,
bias_initializer=output_bias, name=layer_name
)
return last_layer
def load_bertweet(**kwargs):
bert = TFAutoModelForSequenceClassification.from_pretrained(
os.path.join(LOCAL_MODEL_DIR, "bertweet-base"),
num_labels=1,
classifier_dropout=0.1,
hidden_size=768,
)
if "num_classes" in kwargs and kwargs["num_classes"] > 2:
raise NotImplementedError
return bert, True
def load(
optimizer,
seed,
model_type="twitter_multilingual_bert_base_cased_mlm",
loss_name="BCE",
trainable=True,
**kwargs,
):
if model_type == "bertweet-base":
model, from_logits = load_bertweet()
else:
model, from_logits = load_inhouse_bert(model_type, trainable, seed, **kwargs)
pr_auc = tf.keras.metrics.AUC(curve="PR", name="pr_auc", from_logits=from_logits)
roc_auc = tf.keras.metrics.AUC(curve="ROC", name="roc_auc", from_logits=from_logits)
loss = get_loss(loss_name, from_logits, **kwargs)
if kwargs.get('content_num_classes', None):
second_loss = get_loss(loss_name=kwargs['content_loss_name'], from_logits=from_logits)
loss_weights = {'content_output': kwargs['content_loss_weight'], 'target_output': 1}
model.compile(
optimizer=optimizer,
loss={'content_output': second_loss, 'target_output': loss},
loss_weights=loss_weights,
metrics=[pr_auc, roc_auc],
)
else:
model.compile(
optimizer=optimizer,
loss=loss,
metrics=[pr_auc, roc_auc],
)
print(model.summary(), "logits: ", from_logits)
return model

View File

@ -1,220 +0,0 @@
from collections import defaultdict
import os
from toxicity_ml_pipeline.settings.default_settings_tox import REMOTE_LOGDIR
from toxicity_ml_pipeline.settings.default_settings_abs import LABEL_NAMES
from toxicity_ml_pipeline.utils.absv_utils import parse_labeled_data
from toxicity_ml_pipeline.utils.helpers import compute_precision_fixed_recall, execute_command
from sklearn.metrics import average_precision_score, roc_auc_score
import tensorflow as tf
import wandb
class NothingCallback(tf.keras.callbacks.Callback):
def on_epoch_begin(self, epoch, logs=None):
print("ici, ", epoch)
def on_epoch_end(self, epoch, logs=None):
print("fin ", epoch)
def on_train_batch_end(self, batch, logs=None):
print("fin de batch ", batch)
class ControlledStoppingCheckpointCallback(tf.keras.callbacks.ModelCheckpoint):
def __init__(self, stopping_epoch, *args, **kwargs):
super().__init__(*args, **kwargs)
self.stopping_epoch = stopping_epoch
def on_epoch_end(self, epoch, logs=None):
super().on_epoch_end(epoch, logs)
if epoch == self.stopping_epoch:
self.model.stop_training = True
class SyncingTensorBoard(tf.keras.callbacks.TensorBoard):
def __init__(self, remote_logdir=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.remote_logdir = remote_logdir if remote_logdir is not None else REMOTE_LOGDIR
def on_epoch_end(self, epoch, logs=None):
super().on_epoch_end(epoch, logs=logs)
self.synchronize()
def synchronize(self):
base_dir = os.path.dirname(self.log_dir)
cmd = f"gsutil -m rsync -r {base_dir} {self.remote_logdir}"
execute_command(cmd)
class GradientLoggingTensorBoard(SyncingTensorBoard):
def __init__(self, loader, val_data, freq, *args, **kwargs):
super().__init__(*args, **kwargs)
val_dataset = loader.get_balanced_dataset(
training_data=val_data, size_limit=50, return_as_batch=False
)
data_args = list(val_dataset.batch(32).take(1))[0]
self.x_batch, self.y_batch = data_args[0], data_args[1]
self.freq = freq
self.counter = 0
def _log_gradients(self):
writer = self._train_writer
with writer.as_default():
with tf.GradientTape() as tape:
y_pred = self.model(self.x_batch)
loss = self.model.compiled_loss(y_true=self.y_batch, y_pred=y_pred)
gradient_norm = tf.linalg.global_norm(tape.gradient(loss, self.model.trainable_weights))
tf.summary.scalar("gradient_norm", data=gradient_norm, step=self.counter)
writer.flush()
def on_train_batch_end(self, batch, logs=None):
super().on_batch_end(batch, logs=logs)
self.counter += 1
if batch % self.freq == 0:
self._log_gradients()
class AdditionalResultLogger(tf.keras.callbacks.Callback):
def __init__(
self,
data,
set_,
fixed_recall=0.85,
from_logits=False,
dataset_transform_func=None,
batch_size=64,
dual_head=None,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.set_ = set_
if data is None:
return None
self.single_head = True
try:
self.labels = data.int_label.values
except AttributeError:
self.labels = data.to_dataframe()[LABEL_NAMES].values.astype('int')
self.data = data.to_tf_dataset().map(parse_labeled_data).batch(batch_size)
self.label_names = LABEL_NAMES
else:
self.label_names = ['']
if dual_head:
self.label_names = [f'{e}_label' for e in dual_head]
self.labels = {f'{e}_output': data[f'{e}_label'].values for e in dual_head}
self.single_head = False
if dataset_transform_func is None:
self.data = data.text.values
else:
self.data = dataset_transform_func(data, mb_size=batch_size, shuffle=False)
finally:
if len(self.label_names) == 1:
self.metric_kw = {}
else:
self.metric_kw = {'average': None}
self.counter = 0
self.best_metrics = defaultdict(float)
self.from_logits = from_logits
print(f"Loaded callback for {set_}, from_logits: {from_logits}, labels {self.label_names}")
if 1 < fixed_recall <= 100:
fixed_recall = fixed_recall / 100
elif not (0 < fixed_recall <= 100):
raise ValueError("Threshold should be between 0 and 1, or 0 and 100")
self.fixed_recall = fixed_recall
self.batch_size = batch_size
def compute_precision_fixed_recall(self, labels, preds):
result, _ = compute_precision_fixed_recall(labels=labels, preds=preds,
fixed_recall=self.fixed_recall)
return result
def on_epoch_end(self, epoch, logs=None):
self.additional_evaluations(step=epoch, eval_time="epoch")
def on_train_batch_end(self, batch, logs=None):
self.counter += 1
if self.counter % 2000 == 0:
self.additional_evaluations(step=self.counter, eval_time="batch")
def _binary_evaluations(self, preds, label_name=None, class_index=None):
mask = None
curr_labels = self.labels
if label_name is not None:
curr_labels = self.labels[label_name]
if class_index is not None:
curr_labels = (curr_labels == class_index).astype(int)
if -1 in curr_labels:
mask = curr_labels != -1
curr_labels = curr_labels[mask]
preds = preds[mask]
return {
f"precision_recall{self.fixed_recall}": self.compute_precision_fixed_recall(
labels=curr_labels, preds=preds
),
"pr_auc": average_precision_score(y_true=curr_labels, y_score=preds),
"roc_auc": roc_auc_score(y_true=curr_labels, y_score=preds),
}
def _multiclass_evaluations(self, preds):
pr_auc_l = average_precision_score(y_true=self.labels, y_score=preds, **self.metric_kw)
roc_auc_l = roc_auc_score(y_true=self.labels, y_score=preds, **self.metric_kw)
metrics = {}
for i, label in enumerate(self.label_names):
metrics[f'pr_auc_{label}'] = pr_auc_l[i]
metrics[f'roc_auc_{label}'] = roc_auc_l[i]
return metrics
def additional_evaluations(self, step, eval_time):
print("Evaluating ", self.set_, eval_time, step)
preds = self.model.predict(x=self.data, batch_size=self.batch_size)
if self.from_logits:
preds = tf.keras.activations.sigmoid(preds.logits).numpy()
if self.single_head:
if len(self.label_names) == 1:
metrics = self._binary_evaluations(preds)
else:
metrics = self._multiclass_evaluations(preds)
else:
if preds[0].shape[1] == 1:
binary_preds = preds[0]
multic_preds = preds[1]
else:
binary_preds = preds[1]
multic_preds = preds[0]
binary_metrics = self._binary_evaluations(binary_preds, label_name='target_output')
metrics = {f'{k}_target': v for k, v in binary_metrics.items()}
num_classes = multic_preds.shape[1]
for class_ in range(num_classes):
binary_metrics = self._binary_evaluations(multic_preds[:, class_], label_name='content_output', class_index=class_)
metrics.update({f'{k}_content_{class_}': v for k, v in binary_metrics.items()})
for k, v in metrics.items():
self.best_metrics[f"max_{k}"] = max(v, self.best_metrics[f"max_{k}"])
self.log_metrics(metrics, step=step, eval_time=eval_time)
def log_metrics(self, metrics_d, step, eval_time):
commit = False if self.set_ == "validation" else True
to_report = {self.set_: {**metrics_d, **self.best_metrics}}
if eval_time == "epoch":
to_report["epoch"] = step
wandb.log(to_report, commit=commit)

View File

@ -1,56 +0,0 @@
import tensorflow as tf
from keras.utils import tf_utils
from keras.utils import losses_utils
from keras import backend
def inv_kl_divergence(y_true, y_pred):
y_pred = tf.convert_to_tensor(y_pred)
y_true = tf.cast(y_true, y_pred.dtype)
y_true = backend.clip(y_true, backend.epsilon(), 1)
y_pred = backend.clip(y_pred, backend.epsilon(), 1)
return tf.reduce_sum(y_pred * tf.math.log(y_pred / y_true), axis=-1)
def masked_bce(y_true, y_pred):
y_true = tf.cast(y_true, dtype=tf.float32)
mask = y_true != -1
return tf.keras.metrics.binary_crossentropy(tf.boolean_mask(y_true, mask),
tf.boolean_mask(y_pred, mask))
class LossFunctionWrapper(tf.keras.losses.Loss):
def __init__(self,
fn,
reduction=losses_utils.ReductionV2.AUTO,
name=None,
**kwargs):
super().__init__(reduction=reduction, name=name)
self.fn = fn
self._fn_kwargs = kwargs
def call(self, y_true, y_pred):
if tf.is_tensor(y_pred) and tf.is_tensor(y_true):
y_pred, y_true = losses_utils.squeeze_or_expand_dimensions(y_pred, y_true)
ag_fn = tf.__internal__.autograph.tf_convert(self.fn, tf.__internal__.autograph.control_status_ctx())
return ag_fn(y_true, y_pred, **self._fn_kwargs)
def get_config(self):
config = {}
for k, v in self._fn_kwargs.items():
config[k] = backend.eval(v) if tf_utils.is_tensor_or_variable(v) else v
base_config = super().get_config()
return dict(list(base_config.items()) + list(config.items()))
class InvKLD(LossFunctionWrapper):
def __init__(self,
reduction=losses_utils.ReductionV2.AUTO,
name='inv_kl_divergence'):
super().__init__(inv_kl_divergence, name=name, reduction=reduction)
class MaskedBCE(LossFunctionWrapper):
def __init__(self,
reduction=losses_utils.ReductionV2.AUTO,
name='masked_bce'):
super().__init__(masked_bce, name=name, reduction=reduction)

View File

@ -1,44 +0,0 @@
from typing import Callable
import tensorflow as tf
class WarmUp(tf.keras.optimizers.schedules.LearningRateSchedule):
def __init__(
self,
initial_learning_rate: float,
decay_schedule_fn: Callable,
warmup_steps: int,
power: float = 1.0,
name: str = "",
):
super().__init__()
self.initial_learning_rate = initial_learning_rate
self.warmup_steps = warmup_steps
self.power = power
self.decay_schedule_fn = decay_schedule_fn
self.name = name
def __call__(self, step):
with tf.name_scope(self.name or "WarmUp") as name:
global_step_float = tf.cast(step, tf.float32)
warmup_steps_float = tf.cast(self.warmup_steps, tf.float32)
warmup_percent_done = global_step_float / warmup_steps_float
warmup_learning_rate = self.initial_learning_rate * tf.math.pow(
warmup_percent_done, self.power
)
return tf.cond(
global_step_float < warmup_steps_float,
lambda: warmup_learning_rate,
lambda: self.decay_schedule_fn(step - self.warmup_steps),
name=name,
)
def get_config(self):
return {
"initial_learning_rate": self.initial_learning_rate,
"decay_schedule_fn": self.decay_schedule_fn,
"warmup_steps": self.warmup_steps,
"power": self.power,
"name": self.name,
}

View File

@ -1,54 +0,0 @@
from toxicity_ml_pipeline.load_model import reload_model_weights
from toxicity_ml_pipeline.utils.helpers import load_inference_func, upload_model
import numpy as np
import tensorflow as tf
def score(language, df, gcs_model_path, batch_size=64, text_col="text", kw="", **kwargs):
if language != "en":
raise NotImplementedError(
"Data preprocessing not implemented here, needs to be added for i18n models"
)
model_folder = upload_model(full_gcs_model_path=gcs_model_path)
try:
inference_func = load_inference_func(model_folder)
except OSError:
model = reload_model_weights(model_folder, language, **kwargs)
preds = model.predict(x=df[text_col], batch_size=batch_size)
if type(preds) != list:
if len(preds.shape)> 1 and preds.shape[1] > 1:
if 'num_classes' in kwargs and kwargs['num_classes'] > 1:
raise NotImplementedError
preds = np.mean(preds, 1)
df[f"prediction_{kw}"] = preds
else:
if len(preds) > 2:
raise NotImplementedError
for preds_arr in preds:
if preds_arr.shape[1] == 1:
df[f"prediction_{kw}_target"] = preds_arr
else:
for ind in range(preds_arr.shape[1]):
df[f"prediction_{kw}_content_{ind}"] = preds_arr[:, ind]
return df
else:
return _get_score(inference_func, df, kw=kw, batch_size=batch_size, text_col=text_col)
def _get_score(inference_func, df, text_col="text", kw="", batch_size=64):
score_col = f"prediction_{kw}"
beginning = 0
end = df.shape[0]
predictions = np.zeros(shape=end, dtype=float)
while beginning < end:
mb = df[text_col].values[beginning : beginning + batch_size]
res = inference_func(input_1=tf.constant(mb))
predictions[beginning : beginning + batch_size] = list(res.values())[0].numpy()[:, 0]
beginning += batch_size
df[score_col] = predictions
return df

View File

@ -1,38 +0,0 @@
import os
TEAM_PROJECT = "twttr-toxicity-prod"
try:
from google.cloud import bigquery
except (ModuleNotFoundError, ImportError):
print("No Google packages")
CLIENT = None
else:
from google.auth.exceptions import DefaultCredentialsError
try:
CLIENT = bigquery.Client(project=TEAM_PROJECT)
except DefaultCredentialsError as e:
CLIENT = None
print("Issue at logging time", e)
TRAINING_DATA_LOCATION = f"..."
GCS_ADDRESS = "..."
LOCAL_DIR = os.getcwd()
REMOTE_LOGDIR = "{GCS_ADDRESS}/logs"
MODEL_DIR = "{GCS_ADDRESS}/models"
EXISTING_TASK_VERSIONS = {3, 3.5}
RANDOM_SEED = ...
TRAIN_EPOCHS = 4
MINI_BATCH_SIZE = 32
TARGET_POS_PER_EPOCH = 5000
PERC_TRAINING_TOX = ...
MAX_SEQ_LENGTH = 100
WARM_UP_PERC = 0.1
OUTER_CV = 5
INNER_CV = 5
NUM_PREFETCH = 5
NUM_WORKERS = 10

View File

@ -1,401 +0,0 @@
from datetime import datetime
from importlib import import_module
import os
from toxicity_ml_pipeline.data.data_preprocessing import (
DefaultENNoPreprocessor,
DefaultENPreprocessor,
)
from toxicity_ml_pipeline.data.dataframe_loader import ENLoader, ENLoaderWithSampling
from toxicity_ml_pipeline.data.mb_generator import BalancedMiniBatchLoader
from toxicity_ml_pipeline.load_model import load, get_last_layer
from toxicity_ml_pipeline.optim.callbacks import (
AdditionalResultLogger,
ControlledStoppingCheckpointCallback,
GradientLoggingTensorBoard,
SyncingTensorBoard,
)
from toxicity_ml_pipeline.optim.schedulers import WarmUp
from toxicity_ml_pipeline.settings.default_settings_abs import GCS_ADDRESS as ABS_GCS
from toxicity_ml_pipeline.settings.default_settings_tox import (
GCS_ADDRESS as TOX_GCS,
MODEL_DIR,
RANDOM_SEED,
REMOTE_LOGDIR,
WARM_UP_PERC,
)
from toxicity_ml_pipeline.utils.helpers import check_gpu, set_seeds, upload_model
import numpy as np
import tensorflow as tf
try:
from tensorflow_addons.optimizers import AdamW
except ModuleNotFoundError:
print("No TFA")
class Trainer(object):
OPTIMIZERS = ["Adam", "AdamW"]
def __init__(
self,
optimizer_name,
weight_decay,
learning_rate,
mb_size,
train_epochs,
content_loss_weight=1,
language="en",
scope='TOX',
project=...,
experiment_id="default",
gradient_clipping=None,
fold="time",
seed=RANDOM_SEED,
log_gradients=False,
kw="",
stopping_epoch=None,
test=False,
):
self.seed = seed
self.weight_decay = weight_decay
self.learning_rate = learning_rate
self.mb_size = mb_size
self.train_epochs = train_epochs
self.gradient_clipping = gradient_clipping
if optimizer_name not in self.OPTIMIZERS:
raise ValueError(
f"Optimizer {optimizer_name} not implemented. Accepted values {self.OPTIMIZERS}."
)
self.optimizer_name = optimizer_name
self.log_gradients = log_gradients
self.test = test
self.fold = fold
self.stopping_epoch = stopping_epoch
self.language = language
if scope == 'TOX':
GCS_ADDRESS = TOX_GCS.format(project=project)
elif scope == 'ABS':
GCS_ADDRESS = ABS_GCS
else:
raise ValueError
GCS_ADDRESS = GCS_ADDRESS.format(project=project)
try:
self.setting_file = import_module(f"toxicity_ml_pipeline.settings.{scope.lower()}{project}_settings")
except ModuleNotFoundError:
raise ValueError(f"You need to define a setting file for your project {project}.")
experiment_settings = self.setting_file.experiment_settings
self.project = project
self.remote_logdir = REMOTE_LOGDIR.format(GCS_ADDRESS=GCS_ADDRESS, project=project)
self.model_dir = MODEL_DIR.format(GCS_ADDRESS=GCS_ADDRESS, project=project)
if experiment_id not in experiment_settings:
raise ValueError("This is not an experiment id as defined in the settings file.")
for var, default_value in experiment_settings["default"].items():
override_val = experiment_settings[experiment_id].get(var, default_value)
print("Setting ", var, override_val)
self.__setattr__(var, override_val)
self.content_loss_weight = content_loss_weight if self.dual_head else None
self.mb_loader = BalancedMiniBatchLoader(
fold=self.fold,
seed=self.seed,
perc_training_tox=self.perc_training_tox,
mb_size=self.mb_size,
n_outer_splits="time",
scope=scope,
project=project,
dual_head=self.dual_head,
sample_weights=self.sample_weights,
huggingface=("bertweet" in self.model_type),
)
self._init_dirnames(kw=kw, experiment_id=experiment_id)
print("------- Checking there is a GPU")
check_gpu()
def _init_dirnames(self, kw, experiment_id):
kw = "test" if self.test else kw
hyper_param_kw = ""
if self.optimizer_name == "AdamW":
hyper_param_kw += f"{self.weight_decay}_"
if self.gradient_clipping:
hyper_param_kw += f"{self.gradient_clipping}_"
if self.content_loss_weight:
hyper_param_kw += f"{self.content_loss_weight}_"
experiment_name = (
f"{self.language}{str(datetime.now()).replace(' ', '')[:-7]}{kw}_{experiment_id}{self.fold}_"
f"{self.optimizer_name}_"
f"{self.learning_rate}_"
f"{hyper_param_kw}"
f"{self.mb_size}_"
f"{self.perc_training_tox}_"
f"{self.train_epochs}_seed{self.seed}"
)
print("------- Experiment name: ", experiment_name)
self.logdir = (
f"..."
if self.test
else f"..."
)
self.checkpoint_path = f"{self.model_dir}/{experiment_name}"
@staticmethod
def _additional_writers(logdir, metric_name):
return tf.summary.create_file_writer(os.path.join(logdir, metric_name))
def get_callbacks(self, fold, val_data, test_data):
fold_logdir = self.logdir + f"_fold{fold}"
fold_checkpoint_path = self.checkpoint_path + f"_fold{fold}/{{epoch:02d}}"
tb_args = {
"log_dir": fold_logdir,
"histogram_freq": 0,
"update_freq": 500,
"embeddings_freq": 0,
"remote_logdir": f"{self.remote_logdir}_{self.language}"
if not self.test
else f"{self.remote_logdir}_test",
}
tensorboard_callback = (
GradientLoggingTensorBoard(loader=self.mb_loader, val_data=val_data, freq=10, **tb_args)
if self.log_gradients
else SyncingTensorBoard(**tb_args)
)
callbacks = [tensorboard_callback]
if "bertweet" in self.model_type:
from_logits = True
dataset_transform_func = self.mb_loader.make_huggingface_tensorflow_ds
else:
from_logits = False
dataset_transform_func = None
fixed_recall = 0.85 if not self.dual_head else 0.5
val_callback = AdditionalResultLogger(
data=val_data,
set_="validation",
from_logits=from_logits,
dataset_transform_func=dataset_transform_func,
dual_head=self.dual_head,
fixed_recall=fixed_recall
)
if val_callback is not None:
callbacks.append(val_callback)
test_callback = AdditionalResultLogger(
data=test_data,
set_="test",
from_logits=from_logits,
dataset_transform_func=dataset_transform_func,
dual_head=self.dual_head,
fixed_recall=fixed_recall
)
callbacks.append(test_callback)
checkpoint_args = {
"filepath": fold_checkpoint_path,
"verbose": 0,
"monitor": "val_pr_auc",
"save_weights_only": True,
"mode": "max",
"save_freq": "epoch",
}
if self.stopping_epoch:
checkpoint_callback = ControlledStoppingCheckpointCallback(
**checkpoint_args,
stopping_epoch=self.stopping_epoch,
save_best_only=False,
)
callbacks.append(checkpoint_callback)
return callbacks
def get_lr_schedule(self, steps_per_epoch):
total_num_steps = steps_per_epoch * self.train_epochs
warm_up_perc = WARM_UP_PERC if self.learning_rate >= 1e-3 else 0
warm_up_steps = int(total_num_steps * warm_up_perc)
if self.linear_lr_decay:
learning_rate_fn = tf.keras.optimizers.schedules.PolynomialDecay(
self.learning_rate,
total_num_steps - warm_up_steps,
end_learning_rate=0.0,
power=1.0,
cycle=False,
)
else:
print('Constant learning rate')
learning_rate_fn = self.learning_rate
if warm_up_perc > 0:
print(f".... using warm-up for {warm_up_steps} steps")
warm_up_schedule = WarmUp(
initial_learning_rate=self.learning_rate,
decay_schedule_fn=learning_rate_fn,
warmup_steps=warm_up_steps,
)
return warm_up_schedule
return learning_rate_fn
def get_optimizer(self, schedule):
optim_args = {
"learning_rate": schedule,
"beta_1": 0.9,
"beta_2": 0.999,
"epsilon": 1e-6,
"amsgrad": False,
}
if self.gradient_clipping:
optim_args["global_clipnorm"] = self.gradient_clipping
print(f".... {self.optimizer_name} w global clipnorm {self.gradient_clipping}")
if self.optimizer_name == "Adam":
return tf.keras.optimizers.Adam(**optim_args)
if self.optimizer_name == "AdamW":
optim_args["weight_decay"] = self.weight_decay
return AdamW(**optim_args)
raise NotImplementedError
def get_training_actors(self, steps_per_epoch, val_data, test_data, fold):
callbacks = self.get_callbacks(fold=fold, val_data=val_data, test_data=test_data)
schedule = self.get_lr_schedule(steps_per_epoch=steps_per_epoch)
optimizer = self.get_optimizer(schedule)
return optimizer, callbacks
def load_data(self):
if self.project == 435 or self.project == 211:
if self.dataset_type is None:
data_loader = ENLoader(project=self.project, setting_file=self.setting_file)
dataset_type_args = {}
else:
data_loader = ENLoaderWithSampling(project=self.project, setting_file=self.setting_file)
dataset_type_args = self.dataset_type
df = data_loader.load_data(
language=self.language, test=self.test, reload=self.dataset_reload, **dataset_type_args
)
return df
def preprocess(self, df):
if self.project == 435 or self.project == 211:
if self.preprocessing is None:
data_prepro = DefaultENNoPreprocessor()
elif self.preprocessing == "default":
data_prepro = DefaultENPreprocessor()
else:
raise NotImplementedError
return data_prepro(
df=df,
label_column=self.label_column,
class_weight=self.perc_training_tox if self.sample_weights == 'class_weight' else None,
filter_low_agreements=self.filter_low_agreements,
num_classes=self.num_classes,
)
def load_model(self, optimizer):
smart_bias_value = (
np.log(self.perc_training_tox / (1 - self.perc_training_tox)) if self.smart_bias_init else 0
)
model = load(
optimizer,
seed=self.seed,
trainable=self.trainable,
model_type=self.model_type,
loss_name=self.loss_name,
num_classes=self.num_classes,
additional_layer=self.additional_layer,
smart_bias_value=smart_bias_value,
content_num_classes=self.content_num_classes,
content_loss_name=self.content_loss_name,
content_loss_weight=self.content_loss_weight
)
if self.model_reload is not False:
model_folder = upload_model(full_gcs_model_path=os.path.join(self.model_dir, self.model_reload))
model.load_weights(model_folder)
if self.scratch_last_layer:
print('Putting the last layer back to scratch')
model.layers[-1] = get_last_layer(seed=self.seed,
num_classes=self.num_classes,
smart_bias_value=smart_bias_value)
return model
def _train_single_fold(self, mb_generator, test_data, steps_per_epoch, fold, val_data=None):
steps_per_epoch = 100 if self.test else steps_per_epoch
optimizer, callbacks = self.get_training_actors(
steps_per_epoch=steps_per_epoch, val_data=val_data, test_data=test_data, fold=fold
)
print("Loading model")
model = self.load_model(optimizer)
print(f"Nb of steps per epoch: {steps_per_epoch} ---- launching training")
training_args = {
"epochs": self.train_epochs,
"steps_per_epoch": steps_per_epoch,
"batch_size": self.mb_size,
"callbacks": callbacks,
"verbose": 2,
}
model.fit(mb_generator, **training_args)
return
def train_full_model(self):
print("Setting up random seed.")
set_seeds(self.seed)
print(f"Loading {self.language} data")
df = self.load_data()
df = self.preprocess(df=df)
print("Going to train on everything but the test dataset")
mini_batches, test_data, steps_per_epoch = self.mb_loader.simple_cv_load(df)
self._train_single_fold(
mb_generator=mini_batches, test_data=test_data, steps_per_epoch=steps_per_epoch, fold="full"
)
def train(self):
print("Setting up random seed.")
set_seeds(self.seed)
print(f"Loading {self.language} data")
df = self.load_data()
df = self.preprocess(df=df)
print("Loading MB generator")
i = 0
if self.project == 435 or self.project == 211:
mb_generator, steps_per_epoch, val_data, test_data = self.mb_loader.no_cv_load(full_df=df)
self._train_single_fold(
mb_generator=mb_generator,
val_data=val_data,
test_data=test_data,
steps_per_epoch=steps_per_epoch,
fold=i,
)
else:
raise ValueError("Sure you want to do multiple fold training")
for mb_generator, steps_per_epoch, val_data, test_data in self.mb_loader(full_df=df):
self._train_single_fold(
mb_generator=mb_generator,
val_data=val_data,
test_data=test_data,
steps_per_epoch=steps_per_epoch,
fold=i,
)
i += 1
if i == 3:
break

View File

@ -1,99 +0,0 @@
import bisect
import os
import random as python_random
import subprocess
from toxicity_ml_pipeline.settings.default_settings_tox import LOCAL_DIR
import numpy as np
from sklearn.metrics import precision_recall_curve
try:
import tensorflow as tf
except ModuleNotFoundError:
pass
def upload_model(full_gcs_model_path):
folder_name = full_gcs_model_path
if folder_name[:5] != "gs://":
folder_name = "gs://" + folder_name
dirname = os.path.dirname(folder_name)
epoch = os.path.basename(folder_name)
model_dir = os.path.join(LOCAL_DIR, "models")
cmd = f"mkdir {model_dir}"
try:
execute_command(cmd)
except subprocess.CalledProcessError:
pass
model_dir = os.path.join(model_dir, os.path.basename(dirname))
cmd = f"mkdir {model_dir}"
try:
execute_command(cmd)
except subprocess.CalledProcessError:
pass
try:
_ = int(epoch)
except ValueError:
cmd = f"gsutil rsync -r '{folder_name}' {model_dir}"
weights_dir = model_dir
else:
cmd = f"gsutil cp '{dirname}/checkpoint' {model_dir}/"
execute_command(cmd)
cmd = f"gsutil cp '{os.path.join(dirname, epoch)}*' {model_dir}/"
weights_dir = f"{model_dir}/{epoch}"
execute_command(cmd)
return weights_dir
def compute_precision_fixed_recall(labels, preds, fixed_recall):
precision_values, recall_values, thresholds = precision_recall_curve(y_true=labels, probas_pred=preds)
index_recall = bisect.bisect_left(-recall_values, -1 * fixed_recall)
result = precision_values[index_recall - 1]
print(f"Precision at {recall_values[index_recall-1]} recall: {result}")
return result, thresholds[index_recall - 1]
def load_inference_func(model_folder):
model = tf.saved_model.load(model_folder, ["serve"])
inference_func = model.signatures["serving_default"]
return inference_func
def execute_query(client, query):
job = client.query(query)
df = job.result().to_dataframe()
return df
def execute_command(cmd, print_=True):
s = subprocess.run(cmd, shell=True, capture_output=print_, check=True)
if print_:
print(s.stderr.decode("utf-8"))
print(s.stdout.decode("utf-8"))
def check_gpu():
try:
execute_command("nvidia-smi")
except subprocess.CalledProcessError:
print("There is no GPU when there should be one.")
raise AttributeError
l = tf.config.list_physical_devices("GPU")
if len(l) == 0:
raise ModuleNotFoundError("Tensorflow has not found the GPU. Check your installation")
print(l)
def set_seeds(seed):
np.random.seed(seed)
python_random.seed(seed)
tf.random.set_seed(seed)