dependency refactoring and minor formatting

This commit is contained in:
splurf 2023-04-05 01:47:55 -04:00
parent 949de1a5b1
commit b867310ad5
28 changed files with 666 additions and 559 deletions

2
.gitignore vendored
View File

@ -1,2 +1,2 @@
.DS_Store
navi/**/target

View File

@ -354,9 +354,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.140"
version = "0.2.141"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99227334921fae1a979cf0bfdfcc6b3e5ce376ef57e16fb6fb3ea2ed6095f80c"
checksum = "3304a64d199bb964be99741b7a14d26972741915b3649639149b2479bb46f4b5"
[[package]]
name = "lock_api"

View File

@ -1,23 +1,28 @@
use std::collections::BTreeSet;
use std::fmt::{self, Debug, Display};
use std::fs;
use bpr_thrift::data::DataRecord;
use bpr_thrift::prediction_service::BatchPredictionRequest;
use bpr_thrift::tensor::GeneralTensor;
use log::debug;
use ndarray::Array2;
use once_cell::sync::OnceCell;
use ort::tensor::InputTensor;
use prometheus::{HistogramOpts, HistogramVec};
use segdense::mapper::{FeatureMapper, MapReader};
use segdense::segdense_transform_spec_home_recap_2022::{DensificationTransformSpec, Root};
use segdense::util;
use thrift::protocol::{TBinaryInputProtocol, TSerializable};
use thrift::transport::TBufferChannel;
use crate::all_config;
use crate::all_config::AllConfig;
use {
super::all_config::{self, AllConfig},
bpr_thrift::{
data::DataRecord, prediction_service::BatchPredictionRequest, tensor::GeneralTensor,
},
log::debug,
ndarray::Array2,
once_cell::sync::OnceCell,
ort::tensor::InputTensor,
prometheus::{HistogramOpts, HistogramVec},
segdense::{
mapper::{FeatureMapper, MapReader},
segdense_transform_spec_home_recap_2022::{DensificationTransformSpec, Root},
util,
},
std::{
collections::BTreeSet,
fmt::{self, Debug, Display},
fs,
},
thrift::{
protocol::{TBinaryInputProtocol, TSerializable},
transport::TBufferChannel,
},
};
pub fn log_feature_match(
dr: &DataRecord,
@ -27,26 +32,22 @@ pub fn log_feature_match(
// Note the following algorithm matches features from config using linear search.
// Also the record source is MinDataRecord. This includes only binary and continous features for now.
for (feature_id, feature_value) in dr.continuous_features.as_ref().unwrap().into_iter() {
for (feature_id, feature_value) in dr.continuous_features.as_ref().unwrap() {
debug!(
"{} - Continous Datarecord => Feature ID: {}, Feature value: {}",
dr_type, feature_id, feature_value
"{dr_type} - Continuous Datarecord => Feature ID: {feature_id}, Feature value: {feature_value}"
);
for input_feature in &seg_dense_config.cont.input_features {
if input_feature.feature_id == *feature_id {
debug!("Matching input feature: {:?}", input_feature)
debug!("Matching input feature: {input_feature:?}")
}
}
}
for feature_id in dr.binary_features.as_ref().unwrap().into_iter() {
debug!(
"{} - Binary Datarecord => Feature ID: {}",
dr_type, feature_id
);
for feature_id in dr.binary_features.as_ref().unwrap() {
debug!("{dr_type} - Binary Datarecord => Feature ID: {feature_id}");
for input_feature in &seg_dense_config.binary.input_features {
if input_feature.feature_id == *feature_id {
debug!("Found input feature: {:?}", input_feature)
debug!("Found input feature: {input_feature:?}")
}
}
}
@ -96,15 +97,13 @@ impl BatchPredictionRequestToTorchTensorConverter {
reporting_feature_ids: Vec<(i64, &str)>,
register_metric_fn: Option<impl Fn(&HistogramVec)>,
) -> BatchPredictionRequestToTorchTensorConverter {
let all_config_path = format!("{}/{}/all_config.json", model_dir, model_version);
let seg_dense_config_path = format!(
"{}/{}/segdense_transform_spec_home_recap_2022.json",
model_dir, model_version
);
let all_config_path = format!("{model_dir}/{model_version}/all_config.json");
let seg_dense_config_path =
format!("{model_dir}/{model_version}/segdense_transform_spec_home_recap_2022.json");
let seg_dense_config = util::load_config(&seg_dense_config_path);
let all_config = all_config::parse(
&fs::read_to_string(&all_config_path)
.unwrap_or_else(|error| panic!("error loading all_config.json - {}", error)),
.unwrap_or_else(|error| panic!("error loading all_config.json - {error}")),
)
.unwrap();
@ -138,11 +137,11 @@ impl BatchPredictionRequestToTorchTensorConverter {
let (discrete_feature_metrics, continuous_feature_metrics) = METRICS.get_or_init(|| {
let discrete = HistogramVec::new(
HistogramOpts::new(":navi:feature_id:discrete", "Discrete Feature ID values")
.buckets(Vec::from(&[
0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0, 110.0,
.buckets(Vec::from([
0.0f64, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0, 110.0,
120.0, 130.0, 140.0, 150.0, 160.0, 170.0, 180.0, 190.0, 200.0, 250.0,
300.0, 500.0, 1000.0, 10000.0, 100000.0,
] as &'static [f64])),
])),
&["feature_id"],
)
.expect("metric cannot be created");
@ -151,18 +150,18 @@ impl BatchPredictionRequestToTorchTensorConverter {
":navi:feature_id:continuous",
"continuous Feature ID values",
)
.buckets(Vec::from(&[
0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0, 110.0, 120.0,
130.0, 140.0, 150.0, 160.0, 170.0, 180.0, 190.0, 200.0, 250.0, 300.0, 500.0,
1000.0, 10000.0, 100000.0,
] as &'static [f64])),
.buckets(Vec::from([
0.0f64, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0, 110.0,
120.0, 130.0, 140.0, 150.0, 160.0, 170.0, 180.0, 190.0, 200.0, 250.0, 300.0,
500.0, 1000.0, 10000.0, 100000.0,
])),
&["feature_id"],
)
.expect("metric cannot be created");
register_metric_fn.map(|r| {
if let Some(r) = register_metric_fn {
r(&discrete);
r(&continuous);
});
}
(discrete, continuous)
});
@ -171,16 +170,13 @@ impl BatchPredictionRequestToTorchTensorConverter {
for (feature_id, feature_type) in reporting_feature_ids.iter() {
match *feature_type {
"discrete" => discrete_features_to_report.insert(feature_id.clone()),
"continuous" => continuous_features_to_report.insert(feature_id.clone()),
_ => panic!(
"Invalid feature type {} for reporting metrics!",
feature_type
),
"discrete" => discrete_features_to_report.insert(*feature_id),
"continuous" => continuous_features_to_report.insert(*feature_id),
_ => panic!("Invalid feature type {feature_type} for reporting metrics!"),
};
}
return BatchPredictionRequestToTorchTensorConverter {
BatchPredictionRequestToTorchTensorConverter {
all_config,
seg_dense_config,
all_config_path,
@ -193,7 +189,7 @@ impl BatchPredictionRequestToTorchTensorConverter {
continuous_features_to_report,
discrete_feature_metrics,
continuous_feature_metrics,
};
}
}
fn get_feature_id(feature_name: &str, seg_dense_config: &Root) -> i64 {
@ -203,7 +199,7 @@ impl BatchPredictionRequestToTorchTensorConverter {
return feature.feature_id;
}
}
return -1;
-1
}
fn parse_batch_prediction_request(bytes: Vec<u8>) -> BatchPredictionRequest {
@ -211,7 +207,7 @@ impl BatchPredictionRequestToTorchTensorConverter {
let mut bc = TBufferChannel::with_capacity(bytes.len(), 0);
bc.set_readable_bytes(&bytes);
let mut protocol = TBinaryInputProtocol::new(bc, true);
return BatchPredictionRequest::read_from_in_protocol(&mut protocol).unwrap();
BatchPredictionRequest::read_from_in_protocol(&mut protocol).unwrap()
}
fn get_embedding_tensors(
@ -228,45 +224,43 @@ impl BatchPredictionRequestToTorchTensorConverter {
let mut working_set = vec![0 as f32; total_size];
let mut bpr_start = 0;
for (bpr, &bpr_end) in bprs.iter().zip(batch_size) {
if bpr.common_features.is_some() {
if bpr.common_features.as_ref().unwrap().tensors.is_some() {
if bpr
.common_features
.as_ref()
.unwrap()
.tensors
.as_ref()
.unwrap()
.contains_key(&feature_id)
if bpr.common_features.is_some()
&& bpr.common_features.as_ref().unwrap().tensors.is_some()
&& bpr
.common_features
.as_ref()
.unwrap()
.tensors
.as_ref()
.unwrap()
.contains_key(&feature_id)
{
let source_tensor = bpr
.common_features
.as_ref()
.unwrap()
.tensors
.as_ref()
.unwrap()
.get(&feature_id)
.unwrap();
let tensor = match source_tensor {
GeneralTensor::FloatTensor(float_tensor) =>
//Tensor::of_slice(
{
let source_tensor = bpr
.common_features
.as_ref()
.unwrap()
.tensors
.as_ref()
.unwrap()
.get(&feature_id)
.unwrap();
let tensor = match source_tensor {
GeneralTensor::FloatTensor(float_tensor) =>
//Tensor::of_slice(
{
float_tensor
.floats
.iter()
.map(|x| x.into_inner() as f32)
.collect::<Vec<_>>()
}
_ => vec![0 as f32; cols],
};
float_tensor
.floats
.iter()
.map(|x| x.into_inner() as f32)
.collect::<Vec<_>>()
}
_ => vec![0 as f32; cols],
};
// since the tensor is found in common feature, add it in all batches
for row in bpr_start..bpr_end {
for col in 0..cols {
working_set[row * cols + col] = tensor[col];
}
}
// since the tensor is found in common feature, add it in all batches
for row in bpr_start..bpr_end {
for col in 0..cols {
working_set[row * cols + col] = tensor[col];
}
}
}
@ -300,7 +294,7 @@ impl BatchPredictionRequestToTorchTensorConverter {
}
bpr_start = bpr_end;
}
return Array2::<f32>::from_shape_vec([rows, cols], working_set).unwrap();
Array2::<f32>::from_shape_vec([rows, cols], working_set).unwrap()
}
// Todo : Refactor, create a generic version with different type and field accessors
@ -310,9 +304,9 @@ impl BatchPredictionRequestToTorchTensorConverter {
// (INT64 --> INT64, DataRecord.discrete_feature)
fn get_continuous(&self, bprs: &[BatchPredictionRequest], batch_ends: &[usize]) -> InputTensor {
// These need to be part of model schema
let rows: usize = batch_ends[batch_ends.len() - 1];
let cols: usize = 5293;
let full_size: usize = (rows * cols).try_into().unwrap();
let rows = batch_ends[batch_ends.len() - 1];
let cols = 5293;
let full_size = rows * cols;
let default_val = f32::NAN;
let mut tensor = vec![default_val; full_size];
@ -337,55 +331,48 @@ impl BatchPredictionRequestToTorchTensorConverter {
.unwrap();
for feature in common_features {
match self.feature_mapper.get(feature.0) {
Some(f_info) => {
let idx = f_info.index_within_tensor as usize;
if idx < cols {
// Set value in each row
for r in bpr_start..bpr_end {
let flat_index: usize = (r * cols + idx).try_into().unwrap();
tensor[flat_index] = feature.1.into_inner() as f32;
}
if let Some(f_info) = self.feature_mapper.get(feature.0) {
let idx = f_info.index_within_tensor as usize;
if idx < cols {
// Set value in each row
for r in bpr_start..bpr_end {
let flat_index = r * cols + idx;
tensor[flat_index] = feature.1.into_inner() as f32;
}
}
None => (),
}
if self.continuous_features_to_report.contains(feature.0) {
self.continuous_feature_metrics
.with_label_values(&[feature.0.to_string().as_str()])
.observe(feature.1.into_inner() as f64)
.observe(feature.1.into_inner())
} else if self.discrete_features_to_report.contains(feature.0) {
self.discrete_feature_metrics
.with_label_values(&[feature.0.to_string().as_str()])
.observe(feature.1.into_inner() as f64)
.observe(feature.1.into_inner())
}
}
}
// Process the batch of datarecords
for r in bpr_start..bpr_end {
let dr: &DataRecord =
&bpr.individual_features_list[usize::try_from(r - bpr_start).unwrap()];
let dr: &DataRecord = &bpr.individual_features_list[r - bpr_start];
if dr.continuous_features.is_some() {
for feature in dr.continuous_features.as_ref().unwrap() {
match self.feature_mapper.get(&feature.0) {
Some(f_info) => {
let idx = f_info.index_within_tensor as usize;
let flat_index: usize = (r * cols + idx).try_into().unwrap();
if flat_index < tensor.len() && idx < cols {
tensor[flat_index] = feature.1.into_inner() as f32;
}
if let Some(f_info) = self.feature_mapper.get(feature.0) {
let idx = f_info.index_within_tensor as usize;
let flat_index = r * cols + idx;
if flat_index < tensor.len() && idx < cols {
tensor[flat_index] = feature.1.into_inner() as f32;
}
None => (),
}
if self.continuous_features_to_report.contains(feature.0) {
self.continuous_feature_metrics
.with_label_values(&[feature.0.to_string().as_str()])
.observe(feature.1.into_inner() as f64)
.observe(feature.1.into_inner())
} else if self.discrete_features_to_report.contains(feature.0) {
self.discrete_feature_metrics
.with_label_values(&[feature.0.to_string().as_str()])
.observe(feature.1.into_inner() as f64)
.observe(feature.1.into_inner())
}
}
}
@ -393,22 +380,19 @@ impl BatchPredictionRequestToTorchTensorConverter {
bpr_start = bpr_end;
}
return InputTensor::FloatTensor(
Array2::<f32>::from_shape_vec(
[rows.try_into().unwrap(), cols.try_into().unwrap()],
tensor,
)
.unwrap()
.into_dyn(),
);
InputTensor::FloatTensor(
Array2::<f32>::from_shape_vec([rows, cols], tensor)
.unwrap()
.into_dyn(),
)
}
fn get_binary(&self, bprs: &[BatchPredictionRequest], batch_ends: &[usize]) -> InputTensor {
// These need to be part of model schema
let rows: usize = batch_ends[batch_ends.len() - 1];
let cols: usize = 149;
let full_size: usize = (rows * cols).try_into().unwrap();
let default_val: i64 = 0;
let rows = batch_ends[batch_ends.len() - 1];
let cols = 149;
let full_size = rows * cols;
let default_val = 0;
let mut v = vec![default_val; full_size];
@ -432,55 +416,48 @@ impl BatchPredictionRequestToTorchTensorConverter {
.unwrap();
for feature in common_features {
match self.feature_mapper.get(feature) {
Some(f_info) => {
let idx = f_info.index_within_tensor as usize;
if idx < cols {
// Set value in each row
for r in bpr_start..bpr_end {
let flat_index: usize = (r * cols + idx).try_into().unwrap();
v[flat_index] = 1;
}
if let Some(f_info) = self.feature_mapper.get(feature) {
let idx = f_info.index_within_tensor as usize;
if idx < cols {
// Set value in each row
for r in bpr_start..bpr_end {
let flat_index = r * cols + idx;
v[flat_index] = 1;
}
}
None => (),
}
}
}
// Process the batch of datarecords
for r in bpr_start..bpr_end {
let dr: &DataRecord =
&bpr.individual_features_list[usize::try_from(r - bpr_start).unwrap()];
let dr: &DataRecord = &bpr.individual_features_list[r - bpr_start];
if dr.binary_features.is_some() {
for feature in dr.binary_features.as_ref().unwrap() {
match self.feature_mapper.get(&feature) {
Some(f_info) => {
let idx = f_info.index_within_tensor as usize;
let flat_index: usize = (r * cols + idx).try_into().unwrap();
v[flat_index] = 1;
}
None => (),
if let Some(f_info) = self.feature_mapper.get(feature) {
let idx = f_info.index_within_tensor as usize;
let flat_index = r * cols + idx;
v[flat_index] = 1;
}
}
}
}
bpr_start = bpr_end;
}
return InputTensor::Int64Tensor(
Array2::<i64>::from_shape_vec([rows.try_into().unwrap(), cols.try_into().unwrap()], v)
InputTensor::Int64Tensor(
Array2::<i64>::from_shape_vec([rows, cols], v)
.unwrap()
.into_dyn(),
);
)
}
#[allow(dead_code)]
fn get_discrete(&self, bprs: &[BatchPredictionRequest], batch_ends: &[usize]) -> InputTensor {
// These need to be part of model schema
let rows: usize = batch_ends[batch_ends.len() - 1];
let cols: usize = 320;
let full_size: usize = (rows * cols).try_into().unwrap();
let default_val: i64 = 0;
let rows = batch_ends[batch_ends.len() - 1];
let cols = 320;
let full_size = rows * cols;
let default_val = 0;
let mut v = vec![default_val; full_size];
@ -504,18 +481,15 @@ impl BatchPredictionRequestToTorchTensorConverter {
.unwrap();
for feature in common_features {
match self.feature_mapper.get(feature.0) {
Some(f_info) => {
let idx = f_info.index_within_tensor as usize;
if idx < cols {
// Set value in each row
for r in bpr_start..bpr_end {
let flat_index: usize = (r * cols + idx).try_into().unwrap();
v[flat_index] = *feature.1;
}
if let Some(f_info) = self.feature_mapper.get(feature.0) {
let idx = f_info.index_within_tensor as usize;
if idx < cols {
// Set value in each row
for r in bpr_start..bpr_end {
let flat_index = r * cols + idx;
v[flat_index] = *feature.1;
}
}
None => (),
}
if self.discrete_features_to_report.contains(feature.0) {
self.discrete_feature_metrics
@ -527,18 +501,15 @@ impl BatchPredictionRequestToTorchTensorConverter {
// Process the batch of datarecords
for r in bpr_start..bpr_end {
let dr: &DataRecord = &bpr.individual_features_list[usize::try_from(r).unwrap()];
let dr: &DataRecord = &bpr.individual_features_list[r];
if dr.discrete_features.is_some() {
for feature in dr.discrete_features.as_ref().unwrap() {
match self.feature_mapper.get(&feature.0) {
Some(f_info) => {
let idx = f_info.index_within_tensor as usize;
let flat_index: usize = (r * cols + idx).try_into().unwrap();
if flat_index < v.len() && idx < cols {
v[flat_index] = *feature.1;
}
if let Some(f_info) = self.feature_mapper.get(feature.0) {
let idx = f_info.index_within_tensor as usize;
let flat_index = r * cols + idx;
if flat_index < v.len() && idx < cols {
v[flat_index] = *feature.1;
}
None => (),
}
if self.discrete_features_to_report.contains(feature.0) {
self.discrete_feature_metrics
@ -550,11 +521,11 @@ impl BatchPredictionRequestToTorchTensorConverter {
}
bpr_start = bpr_end;
}
return InputTensor::Int64Tensor(
Array2::<i64>::from_shape_vec([rows.try_into().unwrap(), cols.try_into().unwrap()], v)
InputTensor::Int64Tensor(
Array2::<i64>::from_shape_vec([rows, cols], v)
.unwrap()
.into_dyn(),
);
)
}
fn get_user_embedding(
@ -604,7 +575,7 @@ impl Converter for BatchPredictionRequestToTorchTensorConverter {
.map(|bpr| bpr.individual_features_list.len())
.scan(0usize, |acc, e| {
//running total
*acc = *acc + e;
*acc += e;
Some(*acc)
})
.collect::<Vec<_>>();

View File

@ -1,20 +1,23 @@
use npyz::WriterBuilder;
use npyz::{AutoSerialize, WriteOptions};
use std::io::BufWriter;
use std::{
fs::File,
io::{self, BufRead},
use {
npyz::{AutoSerialize, WriteOptions, WriterBuilder},
std::{
fs::File,
io::{BufRead, BufReader, BufWriter},
},
};
pub fn load_batch_prediction_request_base64(file_name: &str) -> Vec<Vec<u8>> {
let file = File::open(file_name).expect("could not read file");
let mut result = vec![];
for line in io::BufReader::new(file).lines() {
for (mut line_count, line) in BufReader::new(file).lines().enumerate() {
line_count += 1;
#[allow(deprecated)]
let decoded = base64::decode(line.unwrap().trim());
match decoded {
Ok(payload) => result.push(payload),
Err(err) => println!("error decoding line {}", err),
Err(err) => println!("error decoding line {file_name}:{line_count} - {err}"),
}
}
println!("reslt len: {}", result.len());

106
navi/navi/Cargo.lock generated
View File

@ -311,9 +311,9 @@ dependencies = [
[[package]]
name = "cargo_metadata"
version = "0.15.3"
version = "0.15.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08a1ec454bc3eead8719cb56e15dbbfecdbc14e4b3a3ae4936cc6e31f5fc0d07"
checksum = "eee4243f1f26fc7a42710e7439c149e2b10b05472f88090acce52632f231a73a"
dependencies = [
"camino",
"cargo-platform",
@ -707,7 +707,7 @@ checksum = "50d6a0976c999d473fe89ad888d5a284e55366d9dc9038b1ba2aa15128c4afa0"
dependencies = [
"errno-dragonfly",
"libc",
"windows-sys",
"windows-sys 0.45.0",
]
[[package]]
@ -738,7 +738,7 @@ dependencies = [
"cfg-if",
"libc",
"redox_syscall 0.2.16",
"windows-sys",
"windows-sys 0.45.0",
]
[[package]]
@ -1148,13 +1148,13 @@ checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02"
[[package]]
name = "io-lifetimes"
version = "1.0.9"
version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09270fd4fa1111bc614ed2246c7ef56239a3063d5be0d1ec3b589c505d400aeb"
checksum = "9c66c74d2ae7e79a5a8f7ac924adbe38ee42a859c6539ad869eb51f0b52dc220"
dependencies = [
"hermit-abi 0.3.1",
"libc",
"windows-sys",
"windows-sys 0.48.0",
]
[[package]]
@ -1166,7 +1166,7 @@ dependencies = [
"hermit-abi 0.3.1",
"io-lifetimes",
"rustix",
"windows-sys",
"windows-sys 0.45.0",
]
[[package]]
@ -1216,9 +1216,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.140"
version = "0.2.141"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99227334921fae1a979cf0bfdfcc6b3e5ce376ef57e16fb6fb3ea2ed6095f80c"
checksum = "3304a64d199bb964be99741b7a14d26972741915b3649639149b2479bb46f4b5"
[[package]]
name = "libloading"
@ -1322,7 +1322,7 @@ dependencies = [
"libc",
"log",
"wasi",
"windows-sys",
"windows-sys 0.45.0",
]
[[package]]
@ -1576,7 +1576,7 @@ dependencies = [
"libc",
"redox_syscall 0.2.16",
"smallvec",
"windows-sys",
"windows-sys 0.45.0",
]
[[package]]
@ -2037,16 +2037,16 @@ checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustix"
version = "0.37.6"
version = "0.37.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d097081ed288dfe45699b72f5b5d648e5f15d64d900c7080273baa20c16a6849"
checksum = "2aae838e49b3d63e9274e1c01833cc8139d3fec468c3b84688c628f44b1ae11d"
dependencies = [
"bitflags",
"errno",
"io-lifetimes",
"libc",
"linux-raw-sys",
"windows-sys",
"windows-sys 0.45.0",
]
[[package]]
@ -2363,7 +2363,7 @@ dependencies = [
"fastrand",
"redox_syscall 0.3.5",
"rustix",
"windows-sys",
"windows-sys 0.45.0",
]
[[package]]
@ -2518,7 +2518,7 @@ dependencies = [
"socket2",
"tokio-macros",
"tracing",
"windows-sys",
"windows-sys 0.45.0",
]
[[package]]
@ -3063,7 +3063,16 @@ version = "0.45.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0"
dependencies = [
"windows-targets",
"windows-targets 0.42.2",
]
[[package]]
name = "windows-sys"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
dependencies = [
"windows-targets 0.48.0",
]
[[package]]
@ -3072,21 +3081,42 @@ version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071"
dependencies = [
"windows_aarch64_gnullvm",
"windows_aarch64_gnullvm 0.42.2",
"windows_aarch64_msvc 0.42.2",
"windows_i686_gnu 0.42.2",
"windows_i686_msvc 0.42.2",
"windows_x86_64_gnu 0.42.2",
"windows_x86_64_gnullvm",
"windows_x86_64_gnullvm 0.42.2",
"windows_x86_64_msvc 0.42.2",
]
[[package]]
name = "windows-targets"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5"
dependencies = [
"windows_aarch64_gnullvm 0.48.0",
"windows_aarch64_msvc 0.48.0",
"windows_i686_gnu 0.48.0",
"windows_i686_msvc 0.48.0",
"windows_x86_64_gnu 0.48.0",
"windows_x86_64_gnullvm 0.48.0",
"windows_x86_64_msvc 0.48.0",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8"
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc"
[[package]]
name = "windows_aarch64_msvc"
version = "0.36.1"
@ -3099,6 +3129,12 @@ version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43"
[[package]]
name = "windows_aarch64_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3"
[[package]]
name = "windows_i686_gnu"
version = "0.36.1"
@ -3111,6 +3147,12 @@ version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f"
[[package]]
name = "windows_i686_gnu"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241"
[[package]]
name = "windows_i686_msvc"
version = "0.36.1"
@ -3123,6 +3165,12 @@ version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060"
[[package]]
name = "windows_i686_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00"
[[package]]
name = "windows_x86_64_gnu"
version = "0.36.1"
@ -3135,12 +3183,24 @@ version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36"
[[package]]
name = "windows_x86_64_gnu"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953"
[[package]]
name = "windows_x86_64_msvc"
version = "0.36.1"
@ -3153,6 +3213,12 @@ version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0"
[[package]]
name = "windows_x86_64_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a"
[[package]]
name = "xattr"
version = "0.2.3"

View File

@ -22,8 +22,8 @@ path = "src/bin/navi_onnx.rs"
required-features = ["onnx"]
[features]
default = [ ]
navi_console = [ ]
default = []
navi_console = []
torch = ["tch"]
onnx = ["ort"]
tf = ["tensorflow"]

View File

@ -1,5 +1,5 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
//::compile_protos("proto/tensorflow_serving/apis/prediction_service.proto")?;
// ::compile_protos("proto/tensorflow_serving/apis/prediction_service.proto")?;
tonic_build::configure().compile(
&[
"proto/tensorflow_serving/apis/prediction_service.proto",

View File

@ -1,23 +1,26 @@
use arrayvec::ArrayVec;
use itertools::Itertools;
use log::info;
use std::sync::Arc;
use tokio::sync::oneshot::Sender;
use tokio::time::Instant;
use crate::bootstrap::{TensorInput, TensorInputEnum};
use crate::cli_args::{ARGS, MODEL_SPECS};
use crate::metrics::{
BATCH_SIZE, BATCH_SIZE_BY_MODEL, BLOCKING_REQUEST_NUM, MODEL_INFERENCE_TIME_COLLECTOR,
NUM_BATCHES_DROPPED, NUM_BATCHES_DROPPED_BY_MODEL, NUM_BATCH_PREDICTION,
NUM_BATCH_PREDICTION_BY_MODEL, NUM_PREDICTION_BY_MODEL, NUM_REQUESTS_DROPPED,
NUM_REQUESTS_DROPPED_BY_MODEL,
use {
super::{
bootstrap::{TensorInput, TensorInputEnum},
cli_args::{ARGS, MODEL_SPECS},
metrics::{
BATCH_SIZE, BATCH_SIZE_BY_MODEL, BLOCKING_REQUEST_NUM, MODEL_INFERENCE_TIME_COLLECTOR,
NUM_BATCHES_DROPPED, NUM_BATCHES_DROPPED_BY_MODEL, NUM_BATCH_PREDICTION,
NUM_BATCH_PREDICTION_BY_MODEL, NUM_PREDICTION_BY_MODEL, NUM_REQUESTS_DROPPED,
NUM_REQUESTS_DROPPED_BY_MODEL,
},
predict_service::Model,
tf_proto::{
tensorflow_serving::{model_spec::VersionChoice, PredictRequest},
DataType,
},
Callback, PredictResult, MAX_NUM_INPUTS,
},
arrayvec::ArrayVec,
itertools::Itertools,
log::info,
std::sync::Arc,
tokio::{sync::oneshot::Sender, time::Instant},
};
use crate::predict_service::Model;
use crate::tf_proto::tensorflow_serving::model_spec::VersionChoice;
use crate::tf_proto::tensorflow_serving::PredictRequest;
use crate::tf_proto::DataType;
use crate::{Callback, PredictResult, MAX_NUM_INPUTS};
#[derive(Debug)]
pub struct BatchPredictor<T: Model> {
@ -116,7 +119,7 @@ impl<T: Model> BatchPredictor<T> {
#[inline(always)]
pub fn push(&mut self, val: Vec<TensorInput>, resp: Sender<PredictResult>, ts: Instant) {
if self.input_tensors.is_empty() {
//only when queue is empty then we update ts to represent first request time
// only when queue is empty then we update ts to represent first request time
self.queue_reset_ts = Instant::now();
self.queue_earliest_rq_ts = ts;
}
@ -134,18 +137,18 @@ impl<T: Model> BatchPredictor<T> {
let mut batch_input_tensors = Vec::with_capacity(self.max_batch_size);
let mut batch_callbacks = Vec::with_capacity(self.max_batch_size);
let mut batch_size = 0;
//now we swap so we can take two queues to the blocking-send thread and reset current queues
// now we swap so we can take two queues to the blocking-send thread and reset current queues
std::mem::swap(&mut self.input_tensors, &mut batch_input_tensors);
std::mem::swap(&mut self.callbacks, &mut batch_callbacks);
std::mem::swap(&mut self.cur_batch_size, &mut batch_size);
let model = self.model.clone();
let batch_earliest_rq_ts = self.queue_earliest_rq_ts;
//info!("batch predict for model:{}, size:{}", self.tf_model.export_dir, vals0.len());
// info!("batch predict for model:{}, size:{}", self.tf_model.export_dir, vals0.len());
BLOCKING_REQUEST_NUM.inc();
tokio::task::spawn_blocking(move || {
//proactively drop stale batches, we drop the entire batch
//as long as one request in that batch is stale. We may drop more than we could this way
//but this should work fairly decently well
// proactively drop stale batches, we drop the entire batch
// as long as one request in that batch is stale. We may drop more than we could this way
// but this should work fairly decently well
if (batch_earliest_rq_ts.elapsed().as_millis() as u64) < ARGS.batch_drop_millis {
let model_inference_time_start = Instant::now();
let (tensor_outs, batch_ends) =
@ -164,7 +167,7 @@ impl<T: Model> BatchPredictor<T> {
.send(PredictResult::Ok(tensors_send_back, model.version()))
.is_err()
{
//use dropped metrics here as this is expected under high load
// use dropped metrics here as this is expected under high load
NUM_REQUESTS_DROPPED.inc();
NUM_REQUESTS_DROPPED_BY_MODEL
.with_label_values(&[&MODEL_SPECS[model.model_idx()]])

View File

@ -1,26 +1,31 @@
use anyhow::Result;
use log::info;
use navi::cli_args::{ARGS, MODEL_SPECS};
use navi::cores::validator::validatior::cli_validator;
use navi::tf_model::tf::TFModel;
use navi::{bootstrap, metrics};
use sha256::digest;
use {
anyhow::Result,
log::info,
navi::{
bootstrap,
cli_args::{ARGS, MODEL_SPECS},
cores::validator::validatior::cli_validator,
metrics,
tf_model::tf::TFModel,
},
sha256::digest,
};
fn main() -> Result<()> {
env_logger::init();
cli_validator::validate_input_args();
//only validate in for tf as other models don't have this
// only validate in for tf as other models don't have this
assert_eq!(MODEL_SPECS.len(), ARGS.serving_sig.len());
metrics::register_custom_metrics();
//load all the custom ops - comma seperaed
// load all the custom ops - comma separated
if let Some(ref customops_lib) = ARGS.customops_lib {
for op_lib in customops_lib.split(",") {
load_custom_op(op_lib);
}
}
// versioning the customop so library
// versioning the custom op to library
bootstrap::bootstrap(TFModel::new)
}
@ -28,15 +33,14 @@ fn load_custom_op(lib_path: &str) -> () {
let res = tensorflow::Library::load(lib_path);
info!("{} load status:{:?}", lib_path, res);
let customop_version_num = get_custom_op_version(lib_path);
// Last OP version is recorded
// Last op version is recorded
metrics::CUSTOMOP_VERSION.set(customop_version_num);
}
//fn get_custom_op_version(customops_lib: &String) -> i64 {
fn get_custom_op_version(customops_lib: &str) -> i64 {
let customop_bytes = std::fs::read(customops_lib).unwrap(); // Vec<u8>
let customop_hash = digest(customop_bytes.as_slice());
//conver the last 4 hex digits to version number as prometheus metrics doesn't support string, the total space is 16^4 == 65536
// convert the last 4 hex digits to version number as prometheus metrics doesn't support string, the total space is 16^4 == 65536
let customop_version_num =
i64::from_str_radix(&customop_hash[customop_hash.len() - 4..], 16).unwrap();
info!(

View File

@ -1,7 +1,12 @@
use anyhow::Result;
use navi::cli_args::{ARGS, MODEL_SPECS};
use navi::onnx_model::onnx::OnnxModel;
use navi::{bootstrap, metrics};
use {
anyhow::Result,
navi::{
bootstrap,
cli_args::{ARGS, MODEL_SPECS},
metrics,
onnx_model::onnx::OnnxModel,
},
};
fn main() -> Result<()> {
env_logger::init();

View File

@ -1,16 +1,16 @@
use anyhow::Result;
use log::info;
use navi::cli_args::ARGS;
use navi::metrics;
use navi::torch_model::torch::TorchModel;
use {
anyhow::Result,
log::info,
navi::{cli_args::ARGS, metrics, torch_model::torch::TorchModel},
};
fn main() -> Result<()> {
env_logger::init();
//torch only has global threadpool settings versus tf has per model threadpool settings
// torch only has global threadpool settings versus tf has per model threadpool settings
assert_eq!(1, ARGS.inter_op_parallelism.len());
assert_eq!(1, ARGS.intra_op_parallelism.len());
//TODO for now we, we assume each model's output has only 1 tensor.
//this will be lifted once torch_model properly implements mtl outputs
// TODO for now we, we assume each model's output has only 1 tensor.
// this will be lifted once torch_model properly implements mtl outputs
tch::set_num_interop_threads(ARGS.inter_op_parallelism[0].parse()?);
tch::set_num_threads(ARGS.intra_op_parallelism[0].parse()?);
info!("torch custom ops not used for now");

View File

@ -1,41 +1,46 @@
use anyhow::Result;
use log::{info, warn};
use std::collections::HashMap;
use tokio::time::Instant;
use tonic::{
transport::{Certificate, Identity, Server, ServerTlsConfig},
Request, Response, Status,
};
use {
super::{
cli_args::{ARGS, INPUTS, OUTPUTS},
kf_serving::{
grpc_inference_service_server::GrpcInferenceService, ModelInferRequest,
ModelInferResponse, ModelMetadataRequest, ModelMetadataResponse, ModelReadyRequest,
ModelReadyResponse, ServerLiveRequest, ServerLiveResponse, ServerMetadataRequest,
ServerMetadataResponse, ServerReadyRequest, ServerReadyResponse,
},
metrics::{
NAVI_VERSION, NUM_PREDICTIONS, NUM_REQUESTS_FAILED, NUM_REQUESTS_FAILED_BY_MODEL,
NUM_REQUESTS_RECEIVED, NUM_REQUESTS_RECEIVED_BY_MODEL, RESPONSE_TIME_COLLECTOR,
},
predict_service::{Model, PredictService},
tf_proto::tensorflow_serving::{
model_spec::VersionChoice::Version,
prediction_service_server::{PredictionService, PredictionServiceServer},
// protobuf related
use crate::tf_proto::tensorflow_serving::{
ClassificationRequest, ClassificationResponse, GetModelMetadataRequest,
GetModelMetadataResponse, MultiInferenceRequest, MultiInferenceResponse, PredictRequest,
PredictResponse, RegressionRequest, RegressionResponse,
};
use crate::{
kf_serving::{
grpc_inference_service_server::GrpcInferenceService, ModelInferRequest, ModelInferResponse,
ModelMetadataRequest, ModelMetadataResponse, ModelReadyRequest, ModelReadyResponse,
ServerLiveRequest, ServerLiveResponse, ServerMetadataRequest, ServerMetadataResponse,
ServerReadyRequest, ServerReadyResponse,
// protobuf related
ClassificationRequest,
ClassificationResponse,
GetModelMetadataRequest,
GetModelMetadataResponse,
ModelSpec,
MultiInferenceRequest,
MultiInferenceResponse,
PredictRequest,
PredictResponse,
RegressionRequest,
RegressionResponse,
},
ModelFactory, PredictResult, NAME, VERSION,
},
tf_proto::tensorflow_serving::prediction_service_server::{
PredictionService, PredictionServiceServer,
anyhow::Result,
log::{info, warn},
std::collections::HashMap,
tokio::time::Instant,
tonic::{
transport::{Certificate, Identity, Server, ServerTlsConfig},
Request, Response, Status,
},
ModelFactory, NAME, VERSION,
};
use crate::cli_args::{ARGS, INPUTS, OUTPUTS};
use crate::metrics::{
NAVI_VERSION, NUM_PREDICTIONS, NUM_REQUESTS_FAILED, NUM_REQUESTS_FAILED_BY_MODEL,
NUM_REQUESTS_RECEIVED, NUM_REQUESTS_RECEIVED_BY_MODEL, RESPONSE_TIME_COLLECTOR,
};
use crate::predict_service::{Model, PredictService};
use crate::tf_proto::tensorflow_serving::model_spec::VersionChoice::Version;
use crate::tf_proto::tensorflow_serving::ModelSpec;
use crate::PredictResult;
#[derive(Debug)]
pub enum TensorInputEnum {
String(Vec<Vec<u8>>),
@ -87,11 +92,11 @@ impl TensorInputEnum {
}
acc
})
.unwrap() //invariant: we expect there's always rows in input_tensors
.unwrap() // invariant: we expect there's always rows in input_tensors
}
}
///entry point for tfServing gRPC
// entry point for tfServing gRPC
#[tonic::async_trait]
impl<T: Model> GrpcInferenceService for PredictService<T> {
async fn server_live(
@ -191,7 +196,7 @@ impl<T: Model> PredictionService for PredictService<T> {
PredictResult::Ok(tensors, version) => {
let mut outputs = HashMap::new();
NUM_PREDICTIONS.with_label_values(&[&model_spec]).inc();
//FIXME: uncomment when prediction scores are normal
// FIXME: uncomment when prediction scores are normal
// PREDICTION_SCORE_SUM
// .with_label_values(&[&model_spec])
// .inc_by(tensors[0]as f64);
@ -240,7 +245,7 @@ impl<T: Model> PredictionService for PredictService<T> {
pub fn bootstrap<T: Model>(model_factory: ModelFactory<T>) -> Result<()> {
info!("package: {}, version: {}, args: {:?}", NAME, VERSION, *ARGS);
//we follow SemVer. So here we assume MAJOR.MINOR.PATCH
// we follow SemVer. So here we assume MAJOR.MINOR.PATCH
let parts = VERSION
.split(".")
.map(|v| v.parse::<i64>())

View File

@ -1,19 +1,23 @@
use crate::{MAX_NUM_INPUTS, MAX_NUM_MODELS, MAX_NUM_OUTPUTS};
use arrayvec::ArrayVec;
use clap::Parser;
use log::info;
use once_cell::sync::OnceCell;
use std::error::Error;
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;
use {
super::{MAX_NUM_INPUTS, MAX_NUM_MODELS, MAX_NUM_OUTPUTS},
arrayvec::ArrayVec,
clap::Parser,
log::info,
once_cell::sync::OnceCell,
std::error::Error,
time::{format_description::well_known::Rfc3339, OffsetDateTime},
};
// Navi is configured through CLI arguments(for now) defined below.
// TODO: use clap_serde to make it config file driven
#[derive(Parser, Debug, Clone)]
///Navi is configured through CLI arguments(for now) defined below.
//TODO: use clap_serde to make it config file driven
pub struct Args {
#[clap(short, long, help = "gRPC port Navi runs ons")]
pub port: i32,
#[clap(long, default_value_t = 9000, help = "prometheus metrics port")]
pub prometheus_port: u16,
#[clap(
short,
long,
@ -21,14 +25,17 @@ pub struct Args {
help = "number of worker threads for tokio async runtime"
)]
pub num_worker_threads: usize,
#[clap(
long,
default_value_t = 14,
help = "number of blocking threads in tokio blocking thread pool"
)]
pub max_blocking_threads: usize,
#[clap(long, default_value = "16", help = "maximum batch size for a batch")]
pub max_batch_size: Vec<String>,
#[clap(
short,
long,
@ -36,18 +43,21 @@ pub struct Args {
help = "max wait time for accumulating a batch"
)]
pub batch_time_out_millis: Vec<String>,
#[clap(
long,
default_value_t = 90,
help = "threshold to start dropping batches under stress"
)]
pub batch_drop_millis: u64,
#[clap(
long,
default_value_t = 300,
help = "polling interval for new version of a model and META.json config"
)]
pub model_check_interval_secs: u64,
#[clap(
short,
long,
@ -55,64 +65,77 @@ pub struct Args {
help = "root directory for models"
)]
pub model_dir: Vec<String>,
#[clap(
long,
help = "directory containing META.json config. separate from model_dir to facilitate remote config management"
)]
pub meta_json_dir: Option<String>,
#[clap(short, long, default_value = "", help = "directory for ssl certs")]
pub ssl_dir: String,
#[clap(
long,
help = "call out to external process to check model updates. custom logic can be written to pull from hdfs, gcs etc"
)]
pub modelsync_cli: Option<String>,
#[clap(
long,
default_value_t = 1,
help = "specify how many versions Navi retains in memory. good for cases of rolling model upgrade"
)]
pub versions_per_model: usize,
#[clap(
short,
long,
help = "most runtimes support loading ops custom writen. currently only implemented for TF"
)]
pub customops_lib: Option<String>,
#[clap(
long,
default_value = "8",
help = "number of threads to paralleling computations inside an op"
)]
pub intra_op_parallelism: Vec<String>,
#[clap(
long,
default_value = "14",
help = "number of threads to parallelize computations of the graph"
)]
pub inter_op_parallelism: Vec<String>,
#[clap(
long,
default_value = "serving_default",
help = "signature of a serving. only TF"
)]
pub serving_sig: Vec<String>,
#[clap(long, default_value = "examples", help = "name of each input tensor")]
pub input: Vec<String>,
#[clap(long, default_value = "output_0", help = "name of each output tensor")]
pub output: Vec<String>,
#[clap(
long,
default_value_t = 500,
help = "max warmup records to use. warmup only implemented for TF"
)]
pub max_warmup_records: usize,
#[clap(
long,
default_value = "true",
help = "when to use graph parallelization. only for ONNX"
)]
pub onnx_use_parallel_mode: String,
// #[clap(long, default_value = "false")]
// pub onnx_use_onednn: String,
#[clap(
@ -121,27 +144,33 @@ pub struct Args {
help = "trace internal memory allocation and generate bulk memory allocations. only for ONNX. turn if off if batch size dynamic"
)]
pub onnx_use_memory_pattern: String,
#[clap(long, value_parser = Args::parse_key_val::<String, String>, value_delimiter=',')]
pub onnx_ep_options: Vec<(String, String)>,
#[clap(long, help = "choice of gpu EPs for ONNX: cuda or tensorrt")]
pub onnx_gpu_ep: Option<String>,
#[clap(
long,
default_value = "home",
help = "converter for various input formats"
)]
pub onnx_use_converter: Option<String>,
#[clap(
long,
help = "whether to enable runtime profiling. only implemented for ONNX for now"
)]
pub profiling: Option<String>,
#[clap(
long,
default_value = "",
help = "metrics reporting for discrete features. only for Home converter for now"
)]
pub onnx_report_discrete_feature_ids: Vec<String>,
#[clap(
long,
default_value = "",
@ -154,7 +183,7 @@ impl Args {
pub fn get_model_specs(model_dir: Vec<String>) -> Vec<String> {
let model_specs = model_dir
.iter()
//let it panic if some model_dir are wrong
// let it panic if some model_dir are wrong
.map(|dir| {
dir.trim_end_matches('/')
.rsplit_once('/')

View File

@ -5,9 +5,9 @@ pub mod validatior {
pub fn validate_input_args() {
assert_eq!(MODEL_SPECS.len(), ARGS.inter_op_parallelism.len());
assert_eq!(MODEL_SPECS.len(), ARGS.intra_op_parallelism.len());
//TODO for now we, we assume each model's output has only 1 tensor.
//this will be lifted once tf_model properly implements mtl outputs
//assert_eq!(OUTPUTS.len(), OUTPUTS.iter().fold(0usize, |a, b| a+b.len()));
// TODO for now we, we assume each model's output has only 1 tensor.
// this will be lifted once tf_model properly implements mtl outputs
// assert_eq!(OUTPUTS.len(), OUTPUTS.iter().fold(0usize, |a, b| a+b.len()));
}
pub fn validate_ps_model_args() {

View File

@ -2,14 +2,17 @@
extern crate lazy_static;
extern crate core;
use crate::bootstrap::TensorInput;
use crate::predict_service::Model;
use crate::tf_proto::{DataType, TensorProto};
use itertools::Itertools;
use serde_json::Value;
use std::ops::Deref;
use tokio::sync::oneshot::Sender;
use tokio::time::Instant;
use {
crate::{
bootstrap::TensorInput,
predict_service::Model,
tf_proto::{DataType, TensorProto},
},
itertools::Itertools,
serde_json::Value,
std::ops::Deref,
tokio::{sync::oneshot::Sender, time::Instant},
};
pub mod batch;
pub mod bootstrap;
@ -25,6 +28,7 @@ pub mod cores {
pub mod tf_proto {
tonic::include_proto!("tensorflow");
pub mod tensorflow_serving {
tonic::include_proto!("tensorflow.serving");
}
@ -33,9 +37,11 @@ pub mod tf_proto {
pub mod kf_serving {
tonic::include_proto!("inference");
}
#[cfg(test)]
mod tests {
use crate::cli_args::Args;
#[test]
fn test_version_string_to_epoch() {
assert_eq!(
@ -47,10 +53,12 @@ mod tests {
}
mod utils {
use crate::cli_args::{ARGS, MODEL_SPECS};
use anyhow::Result;
use log::info;
use serde_json::Value;
use {
crate::cli_args::{ARGS, MODEL_SPECS},
anyhow::Result,
log::info,
serde_json::Value,
};
pub fn read_config(meta_file: &String) -> Result<Value> {
let json = std::fs::read_to_string(meta_file)?;
@ -81,6 +89,7 @@ mod utils {
}
}
}
#[allow(dead_code)]
pub fn get_config_or(model_config: &Value, key: &str, default: &str) -> String {
get_config_or_else(model_config, key, || default.to_string())
@ -111,12 +120,12 @@ pub const MAX_NUM_OUTPUTS: usize = 30;
pub const MAX_NUM_INPUTS: usize = 120;
pub const META_INFO: &str = "META.json";
//use a heap allocated generic type here so that both
//Tensorflow & Pytorch implementation can return their Tensor wrapped in a Box
//without an extra memcopy to Vec
// use a heap allocated generic type here so that both
// Tensorflow & Pytorch implementation can return their Tensor wrapped in a Box
// without an extra memcopy to Vec
pub type TensorReturn<T> = Box<dyn Deref<Target = [T]>>;
//returned tensor may be int64 i.e., a list of relevant ad ids
// returned tensor may be int64 i.e., a list of relevant ad ids
pub enum TensorReturnEnum {
FloatTensorReturn(TensorReturn<f32>),
StringTensorReturn(TensorReturn<String>),
@ -134,9 +143,11 @@ impl TensorReturnEnum {
TensorReturnEnum::Int64TensorReturn(i64_return) => {
TensorScores::Int64TensorScores(i64_return[start..end].to_vec())
}
TensorReturnEnum::Int32TensorReturn(i32_return) => {
TensorScores::Int32TensorScores(i32_return[start..end].to_vec())
}
TensorReturnEnum::StringTensorReturn(str_return) => {
TensorScores::StringTensorScores(str_return[start..end].to_vec())
}
@ -206,10 +217,8 @@ pub enum PredictMessage<T: Model> {
Instant,
),
UpsertModel(T),
/*
#[allow(dead_code)]
DeleteModel(usize),
*/
// #[allow(dead_code)]
// DeleteModel(usize),
}
#[derive(Debug)]

View File

@ -1,10 +1,12 @@
use crate::{NAME, VERSION};
use log::error;
use prometheus::{
CounterVec, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec,
Opts, Registry,
use {
crate::{NAME, VERSION},
log::error,
prometheus::{
CounterVec, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec,
Opts, Registry,
},
warp::{Rejection, Reply},
};
use warp::{Rejection, Reply};
lazy_static! {
pub static ref REGISTRY: Registry = Registry::new();

View File

@ -1,25 +1,30 @@
#[cfg(feature = "onnx")]
pub mod onnx {
use crate::bootstrap::{TensorInput, TensorInputEnum};
use crate::cli_args::{Args, ARGS, INPUTS, MODEL_SPECS, OUTPUTS};
use crate::metrics::{self, CONVERTER_TIME_COLLECTOR};
use crate::predict_service::Model;
use crate::TensorReturnEnum;
use crate::{utils, MAX_NUM_INPUTS, MAX_NUM_OUTPUTS, META_INFO};
use anyhow::Result;
use arrayvec::ArrayVec;
use dr_transform::converter::{BatchPredictionRequestToTorchTensorConverter, Converter};
use itertools::Itertools;
use log::{debug, info};
use ort::environment::Environment;
use ort::session::Session;
use ort::tensor::InputTensor;
use ort::{ExecutionProvider, GraphOptimizationLevel, SessionBuilder};
use serde_json::Value;
use std::fmt::{Debug, Display};
use std::sync::Arc;
use std::{fmt, fs};
use tokio::time::Instant;
use {
crate::{
bootstrap::{TensorInput, TensorInputEnum},
cli_args::{Args, ARGS, INPUTS, MODEL_SPECS, OUTPUTS},
metrics::{self, CONVERTER_TIME_COLLECTOR},
predict_service::Model,
utils, TensorReturnEnum, MAX_NUM_INPUTS, MAX_NUM_OUTPUTS, META_INFO,
},
anyhow::Result,
arrayvec::ArrayVec,
dr_transform::converter::{BatchPredictionRequestToTorchTensorConverter, Converter},
itertools::Itertools,
log::{debug, info},
ort::{
environment::Environment, session::Session, tensor::InputTensor, ExecutionProvider,
GraphOptimizationLevel, SessionBuilder,
},
serde_json::Value,
std::{
fmt::{self, Debug, Display},
fs,
sync::Arc,
},
tokio::time::Instant,
};
lazy_static! {
pub static ref ENVIRONMENT: Arc<Environment> = Arc::new(
@ -39,6 +44,7 @@ pub mod onnx {
pub output_filters: ArrayVec<usize, MAX_NUM_OUTPUTS>,
pub input_converter: Box<dyn Converter>,
}
impl Display for OnnxModel {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
@ -52,6 +58,7 @@ pub mod onnx {
)
}
}
impl Drop for OnnxModel {
fn drop(&mut self) {
if ARGS.profiling != None {
@ -66,6 +73,7 @@ pub mod onnx {
}
}
}
impl OnnxModel {
fn get_output_filters(session: &Session, idx: usize) -> ArrayVec<usize, MAX_NUM_OUTPUTS> {
OUTPUTS[idx]
@ -190,13 +198,13 @@ pub mod onnx {
}
}
///Currently we only assume the input as just one string tensor.
///The string tensor will be be converted to the actual raw tensors.
/// The converter we are using is very specific to home.
/// It reads a BatchDataRecord thrift and decode it to a batch of raw input tensors.
/// Navi will then do server side batching and feed it to ONNX runtime
// Currently we only assume the input as just one string tensor.
// The string tensor will be be converted to the actual raw tensors.
// The converter we are using is very specific to home.
// It reads a BatchDataRecord thrift and decode it to a batch of raw input tensors.
// Navi will then do server side batching and feed it to ONNX runtime
impl Model for OnnxModel {
//TODO: implement a generic online warmup for all runtimes
// TODO: implement a generic online warmup for all runtimes
fn warmup(&self) -> Result<()> {
Ok(())
}
@ -228,7 +236,7 @@ pub mod onnx {
}
})
.unzip();
//invariant we only support one input as string. will relax later
// invariant we only support one input as string. will relax later
assert_eq!(inputs.len(), 1);
let output_tensors = self
.session
@ -248,9 +256,9 @@ pub mod onnx {
.collect::<Vec<_>>();
(
//only works for batch major
//TODO: to_vec() obviously wasteful, especially for large batches(GPU) . Will refactor to
//break up output and return Vec<Vec<TensorScore>> here
// only works for batch major
// TODO: to_vec() obviously wasteful, especially for large batches(GPU). Will refactor to
// break up output and return Vec<Vec<TensorScore>> here
TensorReturnEnum::FloatTensorReturn(Box::new(
output.view().as_slice().unwrap().to_vec(),
)),

View File

@ -1,35 +1,40 @@
use anyhow::{anyhow, Result};
use arrayvec::ArrayVec;
use itertools::Itertools;
use log::{error, info, warn};
use std::fmt::{Debug, Display};
use std::string::String;
use std::sync::Arc;
use std::time::Duration;
use tokio::process::Command;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{mpsc, oneshot};
use tokio::time::{sleep, Instant};
use warp::Filter;
use crate::batch::BatchPredictor;
use crate::bootstrap::TensorInput;
use crate::{
metrics, utils, ModelFactory, PredictMessage, PredictResult, TensorReturnEnum, MAX_NUM_MODELS,
MAX_VERSIONS_PER_MODEL, META_INFO,
use {
crate::{
batch::BatchPredictor,
bootstrap::TensorInput,
cli_args::{ARGS, MODEL_SPECS},
cores::validator::validatior::cli_validator,
metrics,
metrics::MPSC_CHANNEL_SIZE,
utils, ModelFactory, PredictMessage, PredictResult, TensorReturnEnum, MAX_NUM_MODELS,
MAX_VERSIONS_PER_MODEL, META_INFO,
},
anyhow::{anyhow, Result},
arrayvec::ArrayVec,
itertools::Itertools,
log::{error, info, warn},
serde_json::{self, Value},
std::{
fmt::{Debug, Display},
sync::Arc,
time::Duration,
},
tokio::{
process::Command,
sync::{
mpsc::{self, error::TryRecvError, Receiver, Sender},
oneshot,
},
time::{sleep, Instant},
},
warp::Filter,
};
use crate::cli_args::{ARGS, MODEL_SPECS};
use crate::cores::validator::validatior::cli_validator;
use crate::metrics::MPSC_CHANNEL_SIZE;
use serde_json::{self, Value};
pub trait Model: Send + Sync + Display + Debug + 'static {
fn warmup(&self) -> Result<()>;
//TODO: refactor this to return Vec<Vec<TensorScores>>, i.e.
//we have the underlying runtime impl to split the response to each client.
//It will eliminate some inefficient memory copy in onnx_model.rs as well as simplify code
// TODO: refactor this to return Vec<Vec<TensorScores>>, i.e.
// we have the underlying runtime impl to split the response to each client.
// It will eliminate some inefficient memory copy in onnx_model.rs as well as simplify code
fn do_predict(
&self,
input_tensors: Vec<Vec<TensorInput>>,
@ -43,6 +48,7 @@ pub trait Model: Send + Sync + Display + Debug + 'static {
pub struct PredictService<T: Model> {
tx: Sender<PredictMessage<T>>,
}
impl<T: Model> PredictService<T> {
pub async fn init(model_factory: ModelFactory<T>) -> Self {
cli_validator::validate_ps_model_args();
@ -139,7 +145,7 @@ impl<T: Model> PredictService<T> {
cur_version.to_string()
})
});
//as long as next version doesn't match cur version maintained we reload
// as long as next version doesn't match cur version maintained we reload
if next_version.ne(cur_version) {
info!("reload the version: {}->{}", cur_version, next_version);
PredictService::load_latest_model_from_model_dir(
@ -180,14 +186,14 @@ impl<T: Model> PredictService<T> {
}
let meta_dir = utils::get_meta_dir();
let meta_file = format!("{}{}", meta_dir, META_INFO);
//initialize the latest version array
// initialize the latest version array
let mut cur_versions = vec!["".to_owned(); MODEL_SPECS.len()];
loop {
let config = utils::read_config(&meta_file).unwrap_or_else(|e| {
warn!("config file {} not found due to: {}", meta_file, e);
Value::Null
});
info!("***polling for models***"); //nice deliminter
info!("***polling for models***"); // nice deliminter
info!("config:{}", config);
if let Some(ref cli) = ARGS.modelsync_cli {
if let Err(e) = call_external_modelsync(cli, &cur_versions).await {
@ -267,7 +273,7 @@ impl<T: Model> PredictService<T> {
cur_batch_size: 0,
max_batch_size: max_batch_size[idx],
batch_time_out_millis: batch_time_out_millis[idx],
//initialize to be current time
// initialize to be current time
queue_reset_ts: Instant::now(),
queue_earliest_rq_ts: Instant::now(),
};
@ -277,7 +283,7 @@ impl<T: Model> PredictService<T> {
.inc();
info!("now we serve updated model: {}", predictor.model);
//we can do this since the vector is small
// we can do this since the vector is small
let predictors = &mut all_model_predictors[idx];
if predictors.len() == ARGS.versions_per_model {
predictors.remove(predictors.len() - 1);
@ -289,7 +295,7 @@ impl<T: Model> PredictService<T> {
ArrayVec::<BatchPredictor<T>, MAX_VERSIONS_PER_MODEL>::new();
predictors.push(predictor);
all_model_predictors.push(predictors);
//check the invariant that we always push the last model to the end
// check the invariant that we always push the last model to the end
assert_eq!(all_model_predictors.len(), idx + 1)
}
false
@ -298,9 +304,9 @@ impl<T: Model> PredictService<T> {
Err(TryRecvError::Disconnected) => true,
};
for predictor in all_model_predictors.iter_mut().flatten() {
//if predictor batch queue not empty and times out or no more msg in the queue, flush
// if predictor batch queue not empty and times out or no more msg in the queue, flush
if (!predictor.input_tensors.is_empty() && (predictor.duration_past(predictor.batch_time_out_millis) || no_more_msg))
//if batch queue reaches limit, flush
// if batch queue reaches limit, flush
|| predictor.cur_batch_size >= predictor.max_batch_size
{
predictor.batch_predict();

View File

@ -1,37 +1,37 @@
#[cfg(feature = "tf")]
pub mod tf {
use arrayvec::ArrayVec;
use itertools::Itertools;
use log::{debug, error, info, warn};
use prost::Message;
use std::fmt;
use std::fmt::Display;
use std::string::String;
use tensorflow::io::{RecordReadError, RecordReader};
use tensorflow::Operation;
use tensorflow::SavedModelBundle;
use tensorflow::SessionOptions;
use tensorflow::SessionRunArgs;
use tensorflow::Tensor;
use tensorflow::{DataType, FetchToken, Graph, TensorInfo, TensorType};
use std::thread::sleep;
use std::time::Duration;
use crate::cli_args::{Args, ARGS, INPUTS, MODEL_SPECS, OUTPUTS};
use crate::tf_proto::tensorflow_serving::prediction_log::LogType;
use crate::tf_proto::tensorflow_serving::{PredictLog, PredictionLog};
use crate::tf_proto::ConfigProto;
use anyhow::{Context, Result};
use serde_json::Value;
use crate::bootstrap::{TensorInput, TensorInputEnum};
use crate::metrics::{
INFERENCE_FAILED_REQUESTS_BY_MODEL, NUM_REQUESTS_FAILED, NUM_REQUESTS_FAILED_BY_MODEL,
use {
crate::{
bootstrap::{TensorInput, TensorInputEnum},
cli_args::{Args, ARGS, INPUTS, MODEL_SPECS, OUTPUTS},
metrics::{
INFERENCE_FAILED_REQUESTS_BY_MODEL, NUM_REQUESTS_FAILED,
NUM_REQUESTS_FAILED_BY_MODEL,
},
predict_service::Model,
tf_proto::{
tensorflow_serving::{prediction_log::LogType, PredictLog, PredictionLog},
ConfigProto,
},
utils, TensorReturnEnum, MAX_NUM_INPUTS,
},
anyhow::{Context, Result},
arrayvec::ArrayVec,
itertools::Itertools,
log::{debug, error, info, warn},
prost::Message,
serde_json::Value,
std::{
fmt::{self, Display},
thread::sleep,
time::Duration,
},
tensorflow::{
io::{RecordReadError, RecordReader},
DataType, FetchToken, Graph, Operation, SavedModelBundle, SessionOptions,
SessionRunArgs, Tensor, TensorInfo, TensorType,
},
};
use crate::predict_service::Model;
use crate::TensorReturnEnum;
use crate::{utils, MAX_NUM_INPUTS};
#[derive(Debug)]
pub enum TFTensorEnum {
@ -340,7 +340,7 @@ pub mod tf {
})),
} => {
if warmup_cnt == ARGS.max_warmup_records {
//warm up to max_warmup_records records
// warm up to max_warmup_records records
warn!(
"reached max warmup {} records, exit warmup for {}",
ARGS.max_warmup_records,
@ -477,8 +477,8 @@ pub mod tf {
}
predict_return.push(res)
}
//TODO: remove in the future
//TODO: support actual mtl model outputs
// TODO: remove in the future
// TODO: support actual mtl model outputs
(predict_return, batch_ends)
}

View File

@ -1,24 +1,22 @@
#[cfg(feature = "torch")]
pub mod torch {
use std::fmt;
use std::fmt::Display;
use std::string::String;
use crate::bootstrap::TensorInput;
use crate::cli_args::{Args, ARGS, MODEL_SPECS};
use crate::metrics;
use crate::metrics::{
INFERENCE_FAILED_REQUESTS_BY_MODEL, NUM_REQUESTS_FAILED, NUM_REQUESTS_FAILED_BY_MODEL,
use {
crate::{
bootstrap::TensorInput,
cli_args::{Args, ARGS, MODEL_SPECS},
metrics::{
self, INFERENCE_FAILED_REQUESTS_BY_MODEL, NUM_REQUESTS_FAILED,
NUM_REQUESTS_FAILED_BY_MODEL,
},
predict_service::Model,
SerializedInput, TensorReturnEnum,
},
anyhow::Result,
dr_transform::converter::{BatchPredictionRequestToTorchTensorConverter, Converter},
serde_json::Value,
std::fmt::{self, Display},
tch::{kind, CModule, IValue, Tensor},
};
use crate::predict_service::Model;
use crate::SerializedInput;
use crate::TensorReturnEnum;
use anyhow::Result;
use dr_transform::converter::BatchPredictionRequestToTorchTensorConverter;
use dr_transform::converter::Converter;
use serde_json::Value;
use tch::Tensor;
use tch::{kind, CModule, IValue};
#[derive(Debug)]
pub struct TorchModel {
@ -50,7 +48,7 @@ pub mod torch {
version: Args::version_str_to_epoch(&version)?,
module: model,
export_dir,
//TODO: move converter lookup in a registry.
// TODO: move converter lookup in a registry.
input_converter: Box::new(BatchPredictionRequestToTorchTensorConverter::new(
&ARGS.model_dir[idx].as_str(),
version.as_str(),
@ -65,15 +63,15 @@ pub mod torch {
#[inline(always)]
pub fn decode_to_inputs(bytes: SerializedInput) -> Vec<Tensor> {
//FIXME: for now we generate 4 random tensors as inputs to unblock end to end testing
//when Shajan's decoder is ready we will swap
// FIXME: for now we generate 4 random tensors as inputs to unblock end to end testing
// when Shajan's decoder is ready we will swap
let row = bytes.len() as i64;
let t1 = Tensor::randn(&[row, 5293], kind::FLOAT_CPU); //continuous
let t2 = Tensor::randint(10, &[row, 149], kind::INT64_CPU); //binary
let t3 = Tensor::randint(10, &[row, 320], kind::INT64_CPU); //discrete
let t4 = Tensor::randn(&[row, 200], kind::FLOAT_CPU); //user_embedding
let t5 = Tensor::randn(&[row, 200], kind::FLOAT_CPU); //user_eng_embedding
let t6 = Tensor::randn(&[row, 200], kind::FLOAT_CPU); //author_embedding
let t1 = Tensor::randn(&[row, 5293], kind::FLOAT_CPU); // continuous
let t2 = Tensor::randint(10, &[row, 149], kind::INT64_CPU); // binary
let t3 = Tensor::randint(10, &[row, 320], kind::INT64_CPU); // discrete
let t4 = Tensor::randn(&[row, 200], kind::FLOAT_CPU); // user_embedding
let t5 = Tensor::randn(&[row, 200], kind::FLOAT_CPU); // user_eng_embedding
let t6 = Tensor::randn(&[row, 200], kind::FLOAT_CPU); // author_embedding
vec![t1, t2, t3, t4, t5, t6]
}
@ -139,7 +137,7 @@ pub mod torch {
Ok(())
}
//TODO: torch runtime needs some refactor to make it a generic interface
// TODO: torch runtime needs some refactor to make it a generic interface
#[inline(always)]
fn do_predict(
&self,
@ -150,8 +148,8 @@ pub mod torch {
let mut batch_ends = vec![0usize; input_tensors.len()];
for (i, batch_bytes_in_request) in input_tensors.into_iter().enumerate() {
for _ in batch_bytes_in_request.into_iter() {
//FIXME: for now use some hack
let model_input = TorchModel::decode_to_inputs(vec![0u8; 30]); //self.input_converter.convert(bytes);
// FIXME: for now use some hack
let model_input = TorchModel::decode_to_inputs(vec![0u8; 30]); // self.input_converter.convert(bytes);
let input_batch_tensors = model_input
.into_iter()
.map(|t| IValue::Tensor(t))

View File

@ -1,8 +1,7 @@
use std::env;
use std::fs;
use segdense::error::SegDenseError;
use segdense::util;
use {
segdense::{error::SegDenseError, util},
std::{env, fs},
};
fn main() -> Result<(), SegDenseError> {
env_logger::init();

View File

@ -1,5 +1,7 @@
use serde::{Deserialize, Serialize};
use serde_json::Value;
use {
serde::{Deserialize, Serialize},
serde_json::Value,
};
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]

View File

@ -1,11 +1,13 @@
use log::debug;
use std::fs;
use serde_json::{Map, Value};
use crate::error::SegDenseError;
use crate::mapper::{FeatureInfo, FeatureMapper, MapWriter};
use crate::segdense_transform_spec_home_recap_2022::{self as seg_dense, InputFeature};
use {
super::{
error::SegDenseError,
mapper::{FeatureInfo, FeatureMapper, MapWriter},
segdense_transform_spec_home_recap_2022::{self as seg_dense, InputFeature},
},
log::debug,
serde_json::{Map, Value},
std::fs,
};
pub fn load_config(file_name: &str) -> seg_dense::Root {
let json_str = fs::read_to_string(file_name)

View File

@ -44,9 +44,9 @@ checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02"
[[package]]
name = "libc"
version = "0.2.140"
version = "0.2.141"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99227334921fae1a979cf0bfdfcc6b3e5ce376ef57e16fb6fb3ea2ed6095f80c"
checksum = "3304a64d199bb964be99741b7a14d26972741915b3649639149b2479bb46f4b5"
[[package]]
name = "log"

View File

@ -6,26 +6,24 @@
#![allow(clippy::too_many_arguments, clippy::type_complexity, clippy::vec_box)]
#![cfg_attr(rustfmt, rustfmt_skip)]
use std::cell::RefCell;
use std::collections::{BTreeMap, BTreeSet};
use std::convert::{From, TryFrom};
use std::default::Default;
use std::error::Error;
use std::fmt;
use std::fmt::{Display, Formatter};
use std::rc::Rc;
use thrift::OrderedFloat;
use thrift::{ApplicationError, ApplicationErrorKind, ProtocolError, ProtocolErrorKind, TThriftClient};
use thrift::protocol::{TFieldIdentifier, TListIdentifier, TMapIdentifier, TMessageIdentifier, TMessageType, TInputProtocol, TOutputProtocol, TSerializable, TSetIdentifier, TStructIdentifier, TType};
use thrift::protocol::field_id;
use thrift::protocol::verify_expected_message_type;
use thrift::protocol::verify_expected_sequence_number;
use thrift::protocol::verify_expected_service_call;
use thrift::protocol::verify_required_field_exists;
use thrift::server::TProcessor;
use crate::tensor;
use {
std::{
cell::RefCell,
collections::{BTreeMap, BTreeSet},
convert::{From, TryFrom},
default::Default,
error::Error,
fmt::{self, Display, Formatter},
rc::Rc
},
thrift::{
OrderedFloat,
{ApplicationError, ApplicationErrorKind, ProtocolError, ProtocolErrorKind, TThriftClient},
protocol::{TFieldIdentifier, TListIdentifier, TMapIdentifier, TMessageIdentifier, TMessageType, TInputProtocol, TOutputProtocol, TSerializable, TSetIdentifier, TStructIdentifier, TType, field_id, verify_expected_message_type, verify_expected_sequence_number, verify_expected_service_call, verify_required_field_exists},
server::TProcessor,
},
super::tensor
};
#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct FeatureType(pub i32);

View File

@ -1,14 +1,12 @@
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use bpr_thrift::data::DataRecord;
use bpr_thrift::prediction_service::BatchPredictionRequest;
use thrift::OrderedFloat;
use thrift::protocol::TBinaryInputProtocol;
use thrift::protocol::TSerializable;
use thrift::transport::TBufferChannel;
use thrift::Result;
use {
bpr_thrift::{data::DataRecord, prediction_service::BatchPredictionRequest},
std::collections::{BTreeMap, BTreeSet},
thrift::{
protocol::{TBinaryInputProtocol, TSerializable},
transport::TBufferChannel,
OrderedFloat, Result,
},
};
fn main() {
let data_path = "/tmp/current/timelines/output-1";

View File

@ -6,26 +6,25 @@
#![allow(clippy::too_many_arguments, clippy::type_complexity, clippy::vec_box)]
#![cfg_attr(rustfmt, rustfmt_skip)]
use std::cell::RefCell;
use std::collections::{BTreeMap, BTreeSet};
use std::convert::{From, TryFrom};
use std::default::Default;
use std::error::Error;
use std::fmt;
use std::fmt::{Display, Formatter};
use std::rc::Rc;
use thrift::OrderedFloat;
use thrift::{ApplicationError, ApplicationErrorKind, ProtocolError, ProtocolErrorKind, TThriftClient};
use thrift::protocol::{TFieldIdentifier, TListIdentifier, TMapIdentifier, TMessageIdentifier, TMessageType, TInputProtocol, TOutputProtocol, TSerializable, TSetIdentifier, TStructIdentifier, TType};
use thrift::protocol::field_id;
use thrift::protocol::verify_expected_message_type;
use thrift::protocol::verify_expected_sequence_number;
use thrift::protocol::verify_expected_service_call;
use thrift::protocol::verify_required_field_exists;
use thrift::server::TProcessor;
use crate::data;
use {
std::{
cell::RefCell,
collections::{BTreeMap, BTreeSet},
convert::{From, TryFrom},
default::Default,
error::Error,
fmt::{self, Display, Formatter},
rc::Rc
},
thrift::{
OrderedFloat,
{ApplicationError, ApplicationErrorKind, ProtocolError, ProtocolErrorKind, TThriftClient},
protocol::{TFieldIdentifier, TListIdentifier, TMapIdentifier, TMessageIdentifier, TMessageType, TInputProtocol, TOutputProtocol, TSerializable, TSetIdentifier, TStructIdentifier, TType, field_id, verify_expected_message_type, verify_expected_sequence_number, verify_expected_service_call, verify_required_field_exists},
server::TProcessor,
},
super::data
};
//
// PredictionServiceException

View File

@ -6,24 +6,24 @@
#![allow(clippy::too_many_arguments, clippy::type_complexity, clippy::vec_box)]
#![cfg_attr(rustfmt, rustfmt_skip)]
use std::cell::RefCell;
use std::collections::{BTreeMap, BTreeSet};
use std::convert::{From, TryFrom};
use std::default::Default;
use std::error::Error;
use std::fmt;
use std::fmt::{Display, Formatter};
use std::rc::Rc;
use thrift::OrderedFloat;
use thrift::{ApplicationError, ApplicationErrorKind, ProtocolError, ProtocolErrorKind, TThriftClient};
use thrift::protocol::{TFieldIdentifier, TListIdentifier, TMapIdentifier, TMessageIdentifier, TMessageType, TInputProtocol, TOutputProtocol, TSerializable, TSetIdentifier, TStructIdentifier, TType};
use thrift::protocol::field_id;
use thrift::protocol::verify_expected_message_type;
use thrift::protocol::verify_expected_sequence_number;
use thrift::protocol::verify_expected_service_call;
use thrift::protocol::verify_required_field_exists;
use thrift::server::TProcessor;
use {
std::{
cell::RefCell,
collections::{BTreeMap, BTreeSet},
convert::{From, TryFrom},
default::Default,
error::Error,
fmt::{self, Display, Formatter},
rc::Rc
},
thrift::{
OrderedFloat,
{ApplicationError, ApplicationErrorKind, ProtocolError, ProtocolErrorKind, TThriftClient},
protocol::{TFieldIdentifier, TListIdentifier, TMapIdentifier, TMessageIdentifier, TMessageType, TInputProtocol, TOutputProtocol, TSerializable, TSetIdentifier, TStructIdentifier, TType, field_id, verify_expected_message_type, verify_expected_sequence_number, verify_expected_service_call, verify_required_field_exists},
server::TProcessor,
},
super::tensor
};
#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct DataType(pub i32);