mirror of
https://github.com/twitter/the-algorithm-ml.git
synced 2025-01-25 05:11:10 +01:00
88 lines
2.5 KiB
Python
88 lines
2.5 KiB
Python
"""Reader utilities."""
|
|
import itertools
|
|
import time
|
|
from typing import Optional
|
|
|
|
from tml.common.batch import DataclassBatch
|
|
from tml.ml_logging.torch_logging import logging
|
|
|
|
import pyarrow as pa
|
|
import torch
|
|
|
|
|
|
def roundrobin(*iterables):
|
|
"""Round robin through provided iterables, useful for simple load balancing.
|
|
|
|
Adapted from https://docs.python.org/3/library/itertools.html.
|
|
|
|
"""
|
|
num_active = len(iterables)
|
|
nexts = itertools.cycle(iter(it).__next__ for it in iterables)
|
|
while num_active:
|
|
try:
|
|
for _next in nexts:
|
|
result = _next()
|
|
yield result
|
|
except StopIteration:
|
|
# Remove the iterator we just exhausted from the cycle.
|
|
num_active -= 1
|
|
nexts = itertools.cycle(itertools.islice(nexts, num_active))
|
|
logging.warning(f"Iterable exhausted, {num_active} iterables left.")
|
|
except Exception as exc:
|
|
logging.warning(f"Iterable raised exception {exc}, ignoring.")
|
|
# continue
|
|
raise
|
|
|
|
|
|
def speed_check(data_loader, max_steps: int, frequency: int, peek: Optional[int]):
|
|
num_examples = 0
|
|
prev = time.perf_counter()
|
|
for idx, batch in enumerate(data_loader):
|
|
if idx > max_steps:
|
|
break
|
|
if peek and idx % peek == 0:
|
|
logging.info(f"Batch: {batch}")
|
|
num_examples += batch.batch_size
|
|
if idx % frequency == 0:
|
|
now = time.perf_counter()
|
|
elapsed = now - prev
|
|
logging.info(
|
|
f"step: {idx}, "
|
|
f"elapsed(s): {elapsed}, "
|
|
f"examples: {num_examples}, "
|
|
f"ex/s: {num_examples / elapsed}, "
|
|
)
|
|
prev = now
|
|
num_examples = 0
|
|
|
|
|
|
def pa_to_torch(array: pa.array) -> torch.Tensor:
|
|
return torch.from_numpy(array.to_numpy())
|
|
|
|
|
|
def create_default_pa_to_batch(schema) -> DataclassBatch:
|
|
""" """
|
|
_CustomBatch = DataclassBatch.from_schema("DefaultBatch", schema=schema)
|
|
|
|
def get_imputation_value(pa_type):
|
|
type_map = {
|
|
pa.float64(): pa.scalar(0, type=pa.float64()),
|
|
pa.int64(): pa.scalar(0, type=pa.int64()),
|
|
pa.string(): pa.scalar("", type=pa.string()),
|
|
}
|
|
if pa_type not in type_map:
|
|
raise Exception(f"Imputation for type {pa_type} not supported.")
|
|
return type_map[pa_type]
|
|
|
|
def _impute(array: pa.array) -> pa.array:
|
|
return array.fill_null(get_imputation_value(array.type))
|
|
|
|
def _column_to_tensor(record_batch: pa.RecordBatch):
|
|
tensors = {
|
|
col_name: pa_to_torch(_impute(record_batch.column(col_name)))
|
|
for col_name in record_batch.schema.names
|
|
}
|
|
return _CustomBatch(**tensors)
|
|
|
|
return _column_to_tensor
|