mirror of
https://github.com/twitter/the-algorithm.git
synced 2024-06-13 22:58:54 +02:00
Delete navi/dr_transform directory
This commit is contained in:
parent
cc9390187f
commit
fd3a4d8cf2
|
@ -1,29 +0,0 @@
|
|||
[package]
|
||||
name = "dr_transform"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
[dependencies]
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
json = "0.12.4"
|
||||
bpr_thrift = { path = "../thrift_bpr_adapter/thrift/"}
|
||||
segdense = { path = "../segdense/"}
|
||||
thrift = "0.17.0"
|
||||
ndarray = "0.15"
|
||||
ort = {git ="https://github.com/pykeio/ort.git", tag="v1.14.2"}
|
||||
base64 = "0.20.0"
|
||||
npyz = "0.7.2"
|
||||
log = "0.4.17"
|
||||
env_logger = "0.9.0"
|
||||
prometheus = "0.13.1"
|
||||
once_cell = "1.17.0"
|
||||
rand = "0.8.5"
|
||||
itertools = "0.10.5"
|
||||
[dev-dependencies]
|
||||
criterion = "0.3.0"
|
||||
|
||||
[[bench]]
|
||||
name = "bpr_benchmark"
|
||||
harness = false
|
|
@ -1,48 +0,0 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use serde_json::Error;
|
||||
|
||||
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct AllConfig {
|
||||
#[serde(rename = "train_data")]
|
||||
pub train_data: TrainData,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct TrainData {
|
||||
#[serde(rename = "seg_dense_schema")]
|
||||
pub seg_dense_schema: SegDenseSchema,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct SegDenseSchema {
|
||||
#[serde(rename = "renamed_features")]
|
||||
pub renamed_features: RenamedFeatures,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct RenamedFeatures {
|
||||
pub continuous: String,
|
||||
pub binary: String,
|
||||
pub discrete: String,
|
||||
#[serde(rename = "author_embedding")]
|
||||
pub author_embedding: String,
|
||||
#[serde(rename = "user_embedding")]
|
||||
pub user_embedding: String,
|
||||
#[serde(rename = "user_eng_embedding")]
|
||||
pub user_eng_embedding: String,
|
||||
#[serde(rename = "meta__author_id")]
|
||||
pub meta_author_id: String,
|
||||
#[serde(rename = "meta__user_id")]
|
||||
pub meta_user_id: String,
|
||||
#[serde(rename = "meta__tweet_id")]
|
||||
pub meta_tweet_id: String,
|
||||
}
|
||||
|
||||
pub fn parse(json_str: &str) -> Result<AllConfig, Error> {
|
||||
serde_json::from_str(json_str)
|
||||
}
|
|
@ -1,586 +0,0 @@
|
|||
use std::collections::BTreeSet;
|
||||
use std::fmt::{self, Debug, Display};
|
||||
use std::fs;
|
||||
|
||||
use bpr_thrift::data::DataRecord;
|
||||
use bpr_thrift::prediction_service::BatchPredictionRequest;
|
||||
use bpr_thrift::tensor::GeneralTensor;
|
||||
use log::debug;
|
||||
use ndarray::Array2;
|
||||
use once_cell::sync::OnceCell;
|
||||
use ort::tensor::InputTensor;
|
||||
use prometheus::{HistogramOpts, HistogramVec};
|
||||
use segdense::mapper::{FeatureMapper, MapReader};
|
||||
use segdense::segdense_transform_spec_home_recap_2022::{DensificationTransformSpec, Root};
|
||||
use segdense::util;
|
||||
use thrift::protocol::{TBinaryInputProtocol, TSerializable};
|
||||
use thrift::transport::TBufferChannel;
|
||||
|
||||
use crate::{all_config, all_config::AllConfig};
|
||||
|
||||
pub fn log_feature_match(
|
||||
dr: &DataRecord,
|
||||
seg_dense_config: &DensificationTransformSpec,
|
||||
dr_type: String,
|
||||
) {
|
||||
// Note the following algorithm matches features from config using linear search.
|
||||
// Also the record source is MinDataRecord. This includes only binary and continous features for now.
|
||||
|
||||
for (feature_id, feature_value) in dr.continuous_features.as_ref().unwrap() {
|
||||
debug!(
|
||||
"{dr_type} - Continuous Datarecord => Feature ID: {feature_id}, Feature value: {feature_value}"
|
||||
);
|
||||
for input_feature in &seg_dense_config.cont.input_features {
|
||||
if input_feature.feature_id == *feature_id {
|
||||
debug!("Matching input feature: {input_feature:?}")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for feature_id in dr.binary_features.as_ref().unwrap() {
|
||||
debug!("{dr_type} - Binary Datarecord => Feature ID: {feature_id}");
|
||||
for input_feature in &seg_dense_config.binary.input_features {
|
||||
if input_feature.feature_id == *feature_id {
|
||||
debug!("Found input feature: {input_feature:?}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn log_feature_matches(drs: &Vec<DataRecord>, seg_dense_config: &DensificationTransformSpec) {
|
||||
for dr in drs {
|
||||
log_feature_match(dr, seg_dense_config, String::from("individual"));
|
||||
}
|
||||
}
|
||||
|
||||
pub trait Converter: Send + Sync + Debug + 'static + Display {
|
||||
fn convert(&self, input: Vec<Vec<u8>>) -> (Vec<InputTensor>, Vec<usize>);
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[allow(dead_code)]
|
||||
pub struct BatchPredictionRequestToTorchTensorConverter {
|
||||
all_config: AllConfig,
|
||||
seg_dense_config: Root,
|
||||
all_config_path: String,
|
||||
seg_dense_config_path: String,
|
||||
feature_mapper: FeatureMapper,
|
||||
user_embedding_feature_id: i64,
|
||||
user_eng_embedding_feature_id: i64,
|
||||
author_embedding_feature_id: i64,
|
||||
discrete_features_to_report: BTreeSet<i64>,
|
||||
continuous_features_to_report: BTreeSet<i64>,
|
||||
discrete_feature_metrics: &'static HistogramVec,
|
||||
continuous_feature_metrics: &'static HistogramVec,
|
||||
}
|
||||
|
||||
impl Display for BatchPredictionRequestToTorchTensorConverter {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"all_config_path: {}, seg_dense_config_path:{}",
|
||||
self.all_config_path, self.seg_dense_config_path
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl BatchPredictionRequestToTorchTensorConverter {
|
||||
pub fn new(
|
||||
model_dir: &str,
|
||||
model_version: &str,
|
||||
reporting_feature_ids: Vec<(i64, &str)>,
|
||||
register_metric_fn: Option<impl Fn(&HistogramVec)>,
|
||||
) -> BatchPredictionRequestToTorchTensorConverter {
|
||||
let all_config_path = format!("{model_dir}/{model_version}/all_config.json");
|
||||
let seg_dense_config_path =
|
||||
format!("{model_dir}/{model_version}/segdense_transform_spec_home_recap_2022.json");
|
||||
let seg_dense_config = util::load_config(&seg_dense_config_path);
|
||||
let all_config = all_config::parse(
|
||||
&fs::read_to_string(&all_config_path)
|
||||
.unwrap_or_else(|error| panic!("error loading all_config.json - {error}")),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let feature_mapper = util::load_from_parsed_config_ref(&seg_dense_config);
|
||||
|
||||
let user_embedding_feature_id = Self::get_feature_id(
|
||||
&all_config
|
||||
.train_data
|
||||
.seg_dense_schema
|
||||
.renamed_features
|
||||
.user_embedding,
|
||||
&seg_dense_config,
|
||||
);
|
||||
let user_eng_embedding_feature_id = Self::get_feature_id(
|
||||
&all_config
|
||||
.train_data
|
||||
.seg_dense_schema
|
||||
.renamed_features
|
||||
.user_eng_embedding,
|
||||
&seg_dense_config,
|
||||
);
|
||||
let author_embedding_feature_id = Self::get_feature_id(
|
||||
&all_config
|
||||
.train_data
|
||||
.seg_dense_schema
|
||||
.renamed_features
|
||||
.author_embedding,
|
||||
&seg_dense_config,
|
||||
);
|
||||
static METRICS: OnceCell<(HistogramVec, HistogramVec)> = OnceCell::new();
|
||||
let (discrete_feature_metrics, continuous_feature_metrics) = METRICS.get_or_init(|| {
|
||||
let discrete = HistogramVec::new(
|
||||
HistogramOpts::new(":navi:feature_id:discrete", "Discrete Feature ID values")
|
||||
.buckets(Vec::from([
|
||||
0.0f64, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0, 110.0,
|
||||
120.0, 130.0, 140.0, 150.0, 160.0, 170.0, 180.0, 190.0, 200.0, 250.0,
|
||||
300.0, 500.0, 1000.0, 10000.0, 100000.0,
|
||||
])),
|
||||
&["feature_id"],
|
||||
)
|
||||
.expect("metric cannot be created");
|
||||
let continuous = HistogramVec::new(
|
||||
HistogramOpts::new(
|
||||
":navi:feature_id:continuous",
|
||||
"continuous Feature ID values",
|
||||
)
|
||||
.buckets(Vec::from([
|
||||
0.0f64, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0, 110.0,
|
||||
120.0, 130.0, 140.0, 150.0, 160.0, 170.0, 180.0, 190.0, 200.0, 250.0, 300.0,
|
||||
500.0, 1000.0, 10000.0, 100000.0,
|
||||
])),
|
||||
&["feature_id"],
|
||||
)
|
||||
.expect("metric cannot be created");
|
||||
if let Some(r) = register_metric_fn {
|
||||
r(&discrete);
|
||||
r(&continuous);
|
||||
}
|
||||
(discrete, continuous)
|
||||
});
|
||||
|
||||
let mut discrete_features_to_report = BTreeSet::new();
|
||||
let mut continuous_features_to_report = BTreeSet::new();
|
||||
|
||||
for (feature_id, feature_type) in reporting_feature_ids.iter() {
|
||||
match *feature_type {
|
||||
"discrete" => discrete_features_to_report.insert(*feature_id),
|
||||
"continuous" => continuous_features_to_report.insert(*feature_id),
|
||||
_ => panic!("Invalid feature type {feature_type} for reporting metrics!"),
|
||||
};
|
||||
}
|
||||
|
||||
BatchPredictionRequestToTorchTensorConverter {
|
||||
all_config,
|
||||
seg_dense_config,
|
||||
all_config_path,
|
||||
seg_dense_config_path,
|
||||
feature_mapper,
|
||||
user_embedding_feature_id,
|
||||
user_eng_embedding_feature_id,
|
||||
author_embedding_feature_id,
|
||||
discrete_features_to_report,
|
||||
continuous_features_to_report,
|
||||
discrete_feature_metrics,
|
||||
continuous_feature_metrics,
|
||||
}
|
||||
}
|
||||
|
||||
fn get_feature_id(feature_name: &str, seg_dense_config: &Root) -> i64 {
|
||||
// given a feature name, we get the complex feature type id
|
||||
for feature in &seg_dense_config.complex_feature_type_transform_spec {
|
||||
if feature.full_feature_name == feature_name {
|
||||
return feature.feature_id;
|
||||
}
|
||||
}
|
||||
-1
|
||||
}
|
||||
|
||||
fn parse_batch_prediction_request(bytes: Vec<u8>) -> BatchPredictionRequest {
|
||||
// parse batch prediction request into a struct from byte array repr.
|
||||
let mut bc = TBufferChannel::with_capacity(bytes.len(), 0);
|
||||
bc.set_readable_bytes(&bytes);
|
||||
let mut protocol = TBinaryInputProtocol::new(bc, true);
|
||||
BatchPredictionRequest::read_from_in_protocol(&mut protocol).unwrap()
|
||||
}
|
||||
|
||||
fn get_embedding_tensors(
|
||||
&self,
|
||||
bprs: &[BatchPredictionRequest],
|
||||
feature_id: i64,
|
||||
batch_size: &[usize],
|
||||
) -> Array2<f32> {
|
||||
// given an embedding feature id, extract the float tensor array into tensors.
|
||||
let cols: usize = 200;
|
||||
let rows: usize = batch_size[batch_size.len() - 1];
|
||||
let total_size = rows * cols;
|
||||
|
||||
let mut working_set = vec![0 as f32; total_size];
|
||||
let mut bpr_start = 0;
|
||||
for (bpr, &bpr_end) in bprs.iter().zip(batch_size) {
|
||||
if bpr.common_features.is_some()
|
||||
&& bpr.common_features.as_ref().unwrap().tensors.is_some()
|
||||
&& bpr
|
||||
.common_features
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.tensors
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.contains_key(&feature_id)
|
||||
{
|
||||
let source_tensor = bpr
|
||||
.common_features
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.tensors
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.get(&feature_id)
|
||||
.unwrap();
|
||||
let tensor = match source_tensor {
|
||||
GeneralTensor::FloatTensor(float_tensor) =>
|
||||
//Tensor::of_slice(
|
||||
{
|
||||
float_tensor
|
||||
.floats
|
||||
.iter()
|
||||
.map(|x| x.into_inner() as f32)
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
_ => vec![0 as f32; cols],
|
||||
};
|
||||
|
||||
// since the tensor is found in common feature, add it in all batches
|
||||
for row in bpr_start..bpr_end {
|
||||
for col in 0..cols {
|
||||
working_set[row * cols + col] = tensor[col];
|
||||
}
|
||||
}
|
||||
}
|
||||
// find the feature in individual feature list and add to corresponding batch.
|
||||
for (index, datarecord) in bpr.individual_features_list.iter().enumerate() {
|
||||
if datarecord.tensors.is_some()
|
||||
&& datarecord
|
||||
.tensors
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.contains_key(&feature_id)
|
||||
{
|
||||
let source_tensor = datarecord
|
||||
.tensors
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.get(&feature_id)
|
||||
.unwrap();
|
||||
let tensor = match source_tensor {
|
||||
GeneralTensor::FloatTensor(float_tensor) => float_tensor
|
||||
.floats
|
||||
.iter()
|
||||
.map(|x| x.into_inner() as f32)
|
||||
.collect::<Vec<_>>(),
|
||||
_ => vec![0 as f32; cols],
|
||||
};
|
||||
for col in 0..cols {
|
||||
working_set[(bpr_start + index) * cols + col] = tensor[col];
|
||||
}
|
||||
}
|
||||
}
|
||||
bpr_start = bpr_end;
|
||||
}
|
||||
Array2::<f32>::from_shape_vec([rows, cols], working_set).unwrap()
|
||||
}
|
||||
|
||||
// Todo : Refactor, create a generic version with different type and field accessors
|
||||
// Example paramterize and then instiantiate the following
|
||||
// (FLOAT --> FLOAT, DataRecord.continuous_feature)
|
||||
// (BOOL --> INT64, DataRecord.binary_feature)
|
||||
// (INT64 --> INT64, DataRecord.discrete_feature)
|
||||
fn get_continuous(&self, bprs: &[BatchPredictionRequest], batch_ends: &[usize]) -> InputTensor {
|
||||
// These need to be part of model schema
|
||||
let rows = batch_ends[batch_ends.len() - 1];
|
||||
let cols = 5293;
|
||||
let full_size = rows * cols;
|
||||
let default_val = f32::NAN;
|
||||
|
||||
let mut tensor = vec![default_val; full_size];
|
||||
|
||||
let mut bpr_start = 0;
|
||||
for (bpr, &bpr_end) in bprs.iter().zip(batch_ends) {
|
||||
// Common features
|
||||
if bpr.common_features.is_some()
|
||||
&& bpr
|
||||
.common_features
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.continuous_features
|
||||
.is_some()
|
||||
{
|
||||
let common_features = bpr
|
||||
.common_features
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.continuous_features
|
||||
.as_ref()
|
||||
.unwrap();
|
||||
|
||||
for feature in common_features {
|
||||
if let Some(f_info) = self.feature_mapper.get(feature.0) {
|
||||
let idx = f_info.index_within_tensor as usize;
|
||||
if idx < cols {
|
||||
// Set value in each row
|
||||
for r in bpr_start..bpr_end {
|
||||
let flat_index = r * cols + idx;
|
||||
tensor[flat_index] = feature.1.into_inner() as f32;
|
||||
}
|
||||
}
|
||||
}
|
||||
if self.continuous_features_to_report.contains(feature.0) {
|
||||
self.continuous_feature_metrics
|
||||
.with_label_values(&[feature.0.to_string().as_str()])
|
||||
.observe(feature.1.into_inner())
|
||||
} else if self.discrete_features_to_report.contains(feature.0) {
|
||||
self.discrete_feature_metrics
|
||||
.with_label_values(&[feature.0.to_string().as_str()])
|
||||
.observe(feature.1.into_inner())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Process the batch of datarecords
|
||||
for r in bpr_start..bpr_end {
|
||||
let dr: &DataRecord = &bpr.individual_features_list[r - bpr_start];
|
||||
if dr.continuous_features.is_some() {
|
||||
for feature in dr.continuous_features.as_ref().unwrap() {
|
||||
if let Some(f_info) = self.feature_mapper.get(feature.0) {
|
||||
let idx = f_info.index_within_tensor as usize;
|
||||
let flat_index = r * cols + idx;
|
||||
if flat_index < tensor.len() && idx < cols {
|
||||
tensor[flat_index] = feature.1.into_inner() as f32;
|
||||
}
|
||||
}
|
||||
if self.continuous_features_to_report.contains(feature.0) {
|
||||
self.continuous_feature_metrics
|
||||
.with_label_values(&[feature.0.to_string().as_str()])
|
||||
.observe(feature.1.into_inner())
|
||||
} else if self.discrete_features_to_report.contains(feature.0) {
|
||||
self.discrete_feature_metrics
|
||||
.with_label_values(&[feature.0.to_string().as_str()])
|
||||
.observe(feature.1.into_inner())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
bpr_start = bpr_end;
|
||||
}
|
||||
|
||||
InputTensor::FloatTensor(
|
||||
Array2::<f32>::from_shape_vec([rows, cols], tensor)
|
||||
.unwrap()
|
||||
.into_dyn(),
|
||||
)
|
||||
}
|
||||
|
||||
fn get_binary(&self, bprs: &[BatchPredictionRequest], batch_ends: &[usize]) -> InputTensor {
|
||||
// These need to be part of model schema
|
||||
let rows = batch_ends[batch_ends.len() - 1];
|
||||
let cols = 149;
|
||||
let full_size = rows * cols;
|
||||
let default_val = 0;
|
||||
|
||||
let mut v = vec![default_val; full_size];
|
||||
|
||||
let mut bpr_start = 0;
|
||||
for (bpr, &bpr_end) in bprs.iter().zip(batch_ends) {
|
||||
// Common features
|
||||
if bpr.common_features.is_some()
|
||||
&& bpr
|
||||
.common_features
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.binary_features
|
||||
.is_some()
|
||||
{
|
||||
let common_features = bpr
|
||||
.common_features
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.binary_features
|
||||
.as_ref()
|
||||
.unwrap();
|
||||
|
||||
for feature in common_features {
|
||||
if let Some(f_info) = self.feature_mapper.get(feature) {
|
||||
let idx = f_info.index_within_tensor as usize;
|
||||
if idx < cols {
|
||||
// Set value in each row
|
||||
for r in bpr_start..bpr_end {
|
||||
let flat_index = r * cols + idx;
|
||||
v[flat_index] = 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Process the batch of datarecords
|
||||
for r in bpr_start..bpr_end {
|
||||
let dr: &DataRecord = &bpr.individual_features_list[r - bpr_start];
|
||||
if dr.binary_features.is_some() {
|
||||
for feature in dr.binary_features.as_ref().unwrap() {
|
||||
if let Some(f_info) = self.feature_mapper.get(feature) {
|
||||
let idx = f_info.index_within_tensor as usize;
|
||||
let flat_index = r * cols + idx;
|
||||
v[flat_index] = 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
bpr_start = bpr_end;
|
||||
}
|
||||
InputTensor::Int64Tensor(
|
||||
Array2::<i64>::from_shape_vec([rows, cols], v)
|
||||
.unwrap()
|
||||
.into_dyn(),
|
||||
)
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn get_discrete(&self, bprs: &[BatchPredictionRequest], batch_ends: &[usize]) -> InputTensor {
|
||||
// These need to be part of model schema
|
||||
let rows = batch_ends[batch_ends.len() - 1];
|
||||
let cols = 320;
|
||||
let full_size = rows * cols;
|
||||
let default_val = 0;
|
||||
|
||||
let mut v = vec![default_val; full_size];
|
||||
|
||||
let mut bpr_start = 0;
|
||||
for (bpr, &bpr_end) in bprs.iter().zip(batch_ends) {
|
||||
// Common features
|
||||
if bpr.common_features.is_some()
|
||||
&& bpr
|
||||
.common_features
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.discrete_features
|
||||
.is_some()
|
||||
{
|
||||
let common_features = bpr
|
||||
.common_features
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.discrete_features
|
||||
.as_ref()
|
||||
.unwrap();
|
||||
|
||||
for feature in common_features {
|
||||
if let Some(f_info) = self.feature_mapper.get(feature.0) {
|
||||
let idx = f_info.index_within_tensor as usize;
|
||||
if idx < cols {
|
||||
// Set value in each row
|
||||
for r in bpr_start..bpr_end {
|
||||
let flat_index = r * cols + idx;
|
||||
v[flat_index] = *feature.1;
|
||||
}
|
||||
}
|
||||
}
|
||||
if self.discrete_features_to_report.contains(feature.0) {
|
||||
self.discrete_feature_metrics
|
||||
.with_label_values(&[feature.0.to_string().as_str()])
|
||||
.observe(*feature.1 as f64)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Process the batch of datarecords
|
||||
for r in bpr_start..bpr_end {
|
||||
let dr: &DataRecord = &bpr.individual_features_list[r];
|
||||
if dr.discrete_features.is_some() {
|
||||
for feature in dr.discrete_features.as_ref().unwrap() {
|
||||
if let Some(f_info) = self.feature_mapper.get(feature.0) {
|
||||
let idx = f_info.index_within_tensor as usize;
|
||||
let flat_index = r * cols + idx;
|
||||
if flat_index < v.len() && idx < cols {
|
||||
v[flat_index] = *feature.1;
|
||||
}
|
||||
}
|
||||
if self.discrete_features_to_report.contains(feature.0) {
|
||||
self.discrete_feature_metrics
|
||||
.with_label_values(&[feature.0.to_string().as_str()])
|
||||
.observe(*feature.1 as f64)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
bpr_start = bpr_end;
|
||||
}
|
||||
InputTensor::Int64Tensor(
|
||||
Array2::<i64>::from_shape_vec([rows, cols], v)
|
||||
.unwrap()
|
||||
.into_dyn(),
|
||||
)
|
||||
}
|
||||
|
||||
fn get_user_embedding(
|
||||
&self,
|
||||
bprs: &[BatchPredictionRequest],
|
||||
batch_ends: &[usize],
|
||||
) -> InputTensor {
|
||||
InputTensor::FloatTensor(
|
||||
self.get_embedding_tensors(bprs, self.user_embedding_feature_id, batch_ends)
|
||||
.into_dyn(),
|
||||
)
|
||||
}
|
||||
|
||||
fn get_eng_embedding(
|
||||
&self,
|
||||
bpr: &[BatchPredictionRequest],
|
||||
batch_ends: &[usize],
|
||||
) -> InputTensor {
|
||||
InputTensor::FloatTensor(
|
||||
self.get_embedding_tensors(bpr, self.user_eng_embedding_feature_id, batch_ends)
|
||||
.into_dyn(),
|
||||
)
|
||||
}
|
||||
|
||||
fn get_author_embedding(
|
||||
&self,
|
||||
bpr: &[BatchPredictionRequest],
|
||||
batch_ends: &[usize],
|
||||
) -> InputTensor {
|
||||
InputTensor::FloatTensor(
|
||||
self.get_embedding_tensors(bpr, self.author_embedding_feature_id, batch_ends)
|
||||
.into_dyn(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl Converter for BatchPredictionRequestToTorchTensorConverter {
|
||||
fn convert(&self, batched_bytes: Vec<Vec<u8>>) -> (Vec<InputTensor>, Vec<usize>) {
|
||||
let bprs = batched_bytes
|
||||
.into_iter()
|
||||
.map(|bytes| {
|
||||
BatchPredictionRequestToTorchTensorConverter::parse_batch_prediction_request(bytes)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let batch_ends = bprs
|
||||
.iter()
|
||||
.map(|bpr| bpr.individual_features_list.len())
|
||||
.scan(0usize, |acc, e| {
|
||||
//running total
|
||||
*acc += e;
|
||||
Some(*acc)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let t1 = self.get_continuous(&bprs, &batch_ends);
|
||||
let t2 = self.get_binary(&bprs, &batch_ends);
|
||||
//let _t3 = self.get_discrete(&bprs, &batch_ends);
|
||||
let t4 = self.get_user_embedding(&bprs, &batch_ends);
|
||||
let t5 = self.get_eng_embedding(&bprs, &batch_ends);
|
||||
let t6 = self.get_author_embedding(&bprs, &batch_ends);
|
||||
|
||||
(vec![t1, t2, t4, t5, t6], batch_ends)
|
||||
}
|
||||
}
|
|
@ -1,5 +0,0 @@
|
|||
pub mod all_config;
|
||||
pub mod converter;
|
||||
#[cfg(test)]
|
||||
mod test;
|
||||
pub mod util;
|
|
@ -1,32 +0,0 @@
|
|||
use npyz::WriterBuilder;
|
||||
use npyz::{AutoSerialize, WriteOptions};
|
||||
use std::io::BufWriter;
|
||||
use std::{
|
||||
fs::File,
|
||||
io::{self, BufRead},
|
||||
};
|
||||
|
||||
pub fn load_batch_prediction_request_base64(file_name: &str) -> Vec<Vec<u8>> {
|
||||
let file = File::open(file_name).expect("could not read file");
|
||||
let mut result = vec![];
|
||||
for (mut line_count, line) in io::BufReader::new(file).lines().enumerate() {
|
||||
line_count += 1;
|
||||
match base64::decode(line.unwrap().trim()) {
|
||||
Ok(payload) => result.push(payload),
|
||||
Err(err) => println!("error decoding line {file_name}:{line_count} - {err}"),
|
||||
}
|
||||
}
|
||||
println!("result len: {}", result.len());
|
||||
result
|
||||
}
|
||||
|
||||
pub fn save_to_npy<T: npyz::Serialize + AutoSerialize>(data: &[T], save_to: String) {
|
||||
let mut writer = WriteOptions::new()
|
||||
.default_dtype()
|
||||
.shape(&[data.len() as u64, 1])
|
||||
.writer(BufWriter::new(File::create(save_to).unwrap()))
|
||||
.begin_nd()
|
||||
.unwrap();
|
||||
writer.extend(data.to_owned()).unwrap();
|
||||
writer.finish().unwrap();
|
||||
}
|
Loading…
Reference in New Issue
Block a user