mirror of
https://github.com/twitter/the-algorithm.git
synced 2024-06-02 01:08:47 +02:00
73 lines
4.5 KiB
ReStructuredText
73 lines
4.5 KiB
ReStructuredText
|
.. _joining:
|
|||
|
|
|||
|
Joining aggregates features to records
|
|||
|
======================================
|
|||
|
|
|||
|
After setting up either offline batch jobs or online real-time summingbird jobs to produce
|
|||
|
aggregate features and querying them, we are left with data records containing aggregate features.
|
|||
|
This page will go over how to join them with other data records to produce offline training data.
|
|||
|
|
|||
|
(To discuss: joining aggregates to records online)
|
|||
|
|
|||
|
Joining Aggregates on Discrete/String Keys
|
|||
|
------------------------------------------
|
|||
|
|
|||
|
Joining aggregate features keyed on discrete or text features to your training data is very easy -
|
|||
|
you can use the built in methods provided by `DataSetPipe`. For example, suppose you have aggregates
|
|||
|
keyed by `(USER_ID, AUTHOR_ID)`:
|
|||
|
|
|||
|
.. code-block:: scala
|
|||
|
|
|||
|
val userAuthorAggregates: DataSetPipe = AggregatesV2FeatureSource(
|
|||
|
rootPath = “/path/to/my/aggregates”,
|
|||
|
storeName = “user_author_aggregates”,
|
|||
|
aggregates = MyConfig.aggregatesToCompute,
|
|||
|
trimThreshold = 0
|
|||
|
)(dateRange).read
|
|||
|
|
|||
|
Offline, you can then join with your training data set as follows:
|
|||
|
|
|||
|
.. code-block:: scala
|
|||
|
|
|||
|
val myTrainingData: DataSetPipe = ...
|
|||
|
val joinedData = myTrainingData.joinWithLarger((USER_ID, AUTHOR_ID), userAuthorAggregates)
|
|||
|
|
|||
|
You can read from `AggregatesV2MostRecentFeatureSourceBeforeDate` in order to read the most recent aggregates
|
|||
|
before a provided date `beforeDate`. Just note that `beforeDate` must be aligned with the date boundary so if
|
|||
|
you’re passing in a `dateRange`, use `dateRange.end`).
|
|||
|
|
|||
|
Joining Aggregates on Sparse Binary Keys
|
|||
|
----------------------------------------
|
|||
|
|
|||
|
When joining on sparse binary keys, there can be multiple aggregate records to join to each training record in
|
|||
|
your training data set. For example, suppose you have setup an aggregate group that is keyed on `(INTEREST_ID, AUTHOR_ID)`
|
|||
|
capturing engagement counts of users interested in a particular `INTEREST_ID` for specific authors provided by `AUTHOR_ID`.
|
|||
|
|
|||
|
Suppose now that you have a training data record representing a specific user action. This training data record contains
|
|||
|
a sparse binary feature `INTEREST_IDS` representing all the "interests" of that user - e.g. music, sports, and so on. Each `interest_id`
|
|||
|
translates to a different set of counting features found in your aggregates data. Therefore we need a way to merge all of
|
|||
|
these different sets of counting features to produce a more compact, fixed-size set of features.
|
|||
|
|
|||
|
.. admonition:: Merge policies
|
|||
|
|
|||
|
To do this, the aggregate framework provides a trait `SparseBinaryMergePolicy <https://cgit.twitter.biz/source/tree/timelines/data_processing/ml_util/aggregation_framework/conversion/SparseBinaryMergePolicy.scala>`_. Classes overriding this trait define policies
|
|||
|
that state how to merge the individual aggregate features from each sparse binary value (in this case, each `INTEREST_ID` for a user).
|
|||
|
Furthermore, we provide `SparseBinaryMultipleAggregateJoin` which executes these policies to merge aggregates.
|
|||
|
|
|||
|
A simple policy might simply average all the counts from the individual interests, or just take the max, or
|
|||
|
a specific quantile. More advanced policies might use custom criteria to decide which interest is most relevant and choose
|
|||
|
features from that interest to represent the user, or use some weighted combination of counts.
|
|||
|
|
|||
|
The framework provides two simple in-built policies (`PickTopCtrPolicy <https://cgit.twitter.biz/source/tree/timelines/data_processing/ml_util/aggregation_framework/conversion/PickTopCtrPolicy.scala>`_
|
|||
|
and `CombineCountsPolicy <https://cgit.twitter.biz/source/tree/timelines/data_processing/ml_util/aggregation_framework/conversion/CombineCountsPolicy.scala>`_, which keeps the topK counts per
|
|||
|
record) that you can get started with, though you likely want to implement your own policy based on domain knowledge to get
|
|||
|
the best results for your specific problem domain.
|
|||
|
|
|||
|
.. admonition:: Offline Code Example
|
|||
|
|
|||
|
The scalding job `TrainingDataWithAggV2Generator <https://cgit.twitter.biz/source/tree/timelines/data_processing/ad_hoc/recap/training_data_generator/TrainingDataWithAggV2Generator.scala>`_ shows how multiple merge policies are defined and implemented to merge aggregates on sparse binary keys to the TQ's training data records.
|
|||
|
|
|||
|
.. admonition:: Online Code Example
|
|||
|
|
|||
|
In our (non-FeatureStore enabled) online code path, we merge aggregates on sparse binary keys using the `CombineCountsPolicy <https://cgit.twitter.biz/source/tree/timelinemixer/server/src/main/scala/com/twitter/timelinemixer/injection/recapbase/aggregates/UserFeaturesHydrator.scala#n201>`_.
|