This commit is contained in:
petemihaylov 2023-05-22 17:38:52 -05:00 committed by GitHub
commit 5803804716
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 95 additions and 121 deletions

View File

@ -1,6 +1,6 @@
# CR-Mixer # CR-Mixer
CR-Mixer is a candidate generation service proposed as part of the Personalization Strategy vision for Twitter. Its aim is to speed up the iteration and development of candidate generation and light ranking. The service acts as a lightweight coordinating layer that delegates candidate generation tasks to underlying compute services. It focuses on Twitter's candidate generation use cases and offers a centralized platform for fetching, mixing, and managing candidate sources and light rankers. The overarching goal is to increase the speed and ease of testing and developing candidate generation pipelines, ultimately delivering more value to Twitter users. CR-Mixer is a candidate generation service proposed as part of the Personalization Strategy vision for Twitter. It aims to speed up the iteration and development of candidate generation and light ranking. The service acts as a lightweight coordinating layer that delegates candidate generation tasks to underlying compute services. It focuses on Twitter's candidate generation use cases and offers a centralized platform for fetching, mixing, and managing candidate sources and light rankers. The overarching goal is to increase the speed and ease of testing and developing candidate generation pipelines, ultimately delivering more value to Twitter users.
CR-Mixer acts as a configurator and delegator, providing abstractions for the challenging parts of candidate generation and handling performance issues. It will offer a 1-stop-shop for fetching and mixing candidate sources, a managed and shared performant platform, a light ranking layer, a common filtering layer, a version control system, a co-owned feature switch set, and peripheral tooling. CR-Mixer acts as a configurator and delegator, providing abstractions for the challenging parts of candidate generation and handling performance issues. It will offer a 1-stop-shop for fetching and mixing candidate sources, a managed and shared performant platform, a light ranking layer, a common filtering layer, a version control system, a co-owned feature switch set, and peripheral tooling.

View File

@ -2,11 +2,11 @@
# https://sourcegraph.twitter.biz/git.twitter.biz/source/-/blob/cr-mixer/server/src/main/scala/com/twitter/cr_mixer/param/decider/DeciderKey.scala # https://sourcegraph.twitter.biz/git.twitter.biz/source/-/blob/cr-mixer/server/src/main/scala/com/twitter/cr_mixer/param/decider/DeciderKey.scala
dark_traffic_filter: dark_traffic_filter:
comment: Proportion of the requests that are forwarded as dark traffic to the proxy comment: "Proportion of the requests that are forwarded as dark traffic to the proxy"
default_availability: 0 default_availability: 0
enable_tweet_recommendations_home_product: enable_tweet_recommendations_home_product:
comment: Proportion of requests where we return an actual response for TweetRecommendations Home product comment: "Proportion of requests where we return an actual response for TweetRecommendations Home product"
default_availability: 10000 default_availability: 10000
enable_tweet_health_score: enable_tweet_health_score:

View File

@ -26,18 +26,17 @@ case class AdsBlender @Inject() (globalStats: StatsReceiver) {
def blend( def blend(
inputCandidates: Seq[Seq[InitialAdsCandidate]], inputCandidates: Seq[Seq[InitialAdsCandidate]],
): Future[Seq[BlendedAdsCandidate]] = { ): Future[Seq[BlendedAdsCandidate]] = {
// Filter out empty candidate sequence.
// Filter out empty candidate sequence
val candidates = inputCandidates.filter(_.nonEmpty) val candidates = inputCandidates.filter(_.nonEmpty)
val (interestedInCandidates, twistlyCandidates) = val (interestedInCandidates, twistlyCandidates) =
candidates.partition(_.head.candidateGenerationInfo.sourceInfoOpt.isEmpty) candidates.partition(_.head.candidateGenerationInfo.sourceInfoOpt.isEmpty)
// First interleave twistly candidates // Interleave twistly candidates.
val interleavedTwistlyCandidates = InterleaveUtil.interleave(twistlyCandidates) val interleavedTwistlyCandidates = InterleaveUtil.interleave(twistlyCandidates)
val twistlyAndInterestedInCandidates = val twistlyAndInterestedInCandidates =
Seq(interestedInCandidates.flatten, interleavedTwistlyCandidates) Seq(interestedInCandidates.flatten, interleavedTwistlyCandidates)
// then interleave twistly candidates with interested in to make them even // Interleave twistly candidates with interested in to make them even.
val interleavedCandidates = InterleaveUtil.interleave(twistlyAndInterestedInCandidates) val interleavedCandidates = InterleaveUtil.interleave(twistlyAndInterestedInCandidates)
stats.stat("candidates").add(interleavedCandidates.size) stats.stat("candidates").add(interleavedCandidates.size)
@ -45,6 +44,7 @@ case class AdsBlender @Inject() (globalStats: StatsReceiver) {
val blendedCandidates = buildBlendedAdsCandidate(inputCandidates, interleavedCandidates) val blendedCandidates = buildBlendedAdsCandidate(inputCandidates, interleavedCandidates)
Future.value(blendedCandidates) Future.value(blendedCandidates)
} }
private def buildBlendedAdsCandidate( private def buildBlendedAdsCandidate(
inputCandidates: Seq[Seq[InitialAdsCandidate]], inputCandidates: Seq[Seq[InitialAdsCandidate]],
interleavedCandidates: Seq[InitialAdsCandidate] interleavedCandidates: Seq[InitialAdsCandidate]
@ -73,5 +73,4 @@ case class AdsBlender @Inject() (globalStats: StatsReceiver) {
} }
tweetIdMap.toMap tweetIdMap.toMap
} }
} }

View File

@ -9,7 +9,7 @@ import scala.collection.mutable
object BlendedCandidatesBuilder { object BlendedCandidatesBuilder {
/** /**
* @param inputCandidates input candidate prior to interleaving * @param inputCandidates input candidate prior to interleaving.
* @param interleavedCandidates after interleaving. These tweets are de-duplicated. * @param interleavedCandidates after interleaving. These tweets are de-duplicated.
*/ */
def build( def build(
@ -23,8 +23,8 @@ object BlendedCandidatesBuilder {
} }
/** /**
* This function tells you which CandidateGenerationInfo generated a given tweet.
* The same tweet can be generated by different sources. * The same tweet can be generated by different sources.
* This function tells you which CandidateGenerationInfo generated a given tweet
*/ */
private def buildCandidateToCGInfosMap( private def buildCandidateToCGInfosMap(
candidateSeq: Seq[Seq[InitialCandidate]], candidateSeq: Seq[Seq[InitialCandidate]],
@ -44,5 +44,4 @@ object BlendedCandidatesBuilder {
} }
tweetIdMap.toMap tweetIdMap.toMap
} }
} }

View File

@ -17,7 +17,7 @@ case class ContentSignalBlender @Inject() (globalStats: StatsReceiver) {
private val stats: StatsReceiver = globalStats.scope(name) private val stats: StatsReceiver = globalStats.scope(name)
/** /**
* Exposes multiple types of sorting relying only on Content Based signals * Exposes multiple types of sorting relying only on Content Based signals.
* Candidate Recency, Random, FavoriteCount and finally Standardized, which standardizes the scores * Candidate Recency, Random, FavoriteCount and finally Standardized, which standardizes the scores
* that come from the active SimilarityEngine and then sort on the standardized scores. * that come from the active SimilarityEngine and then sort on the standardized scores.
*/ */
@ -25,7 +25,7 @@ case class ContentSignalBlender @Inject() (globalStats: StatsReceiver) {
params: Params, params: Params,
inputCandidates: Seq[Seq[InitialCandidate]], inputCandidates: Seq[Seq[InitialCandidate]],
): Future[Seq[BlendedCandidate]] = { ): Future[Seq[BlendedCandidate]] = {
// Filter out empty candidate sequence // Filter out empty candidate sequence.
val candidates = inputCandidates.filter(_.nonEmpty) val candidates = inputCandidates.filter(_.nonEmpty)
val sortedCandidates = params(BlenderParams.ContentBlenderTypeSortingAlgorithmParam) match { val sortedCandidates = params(BlenderParams.ContentBlenderTypeSortingAlgorithmParam) match {
case BlenderParams.ContentBasedSortingAlgorithmEnum.CandidateRecency => case BlenderParams.ContentBasedSortingAlgorithmEnum.CandidateRecency =>

View File

@ -70,9 +70,9 @@ object CountWeightedInterleaveBlender {
* We pass two parameters to the weighted interleaver: * We pass two parameters to the weighted interleaver:
* @param rankerWeightShrinkage shrinkage parameter between [0, 1] that determines how close we * @param rankerWeightShrinkage shrinkage parameter between [0, 1] that determines how close we
* stay to uniform sampling. The bigger the shrinkage the * stay to uniform sampling. The bigger the shrinkage the
* closer we are to uniform round robin * closer we are to uniform round robin.
* @param maxWeightAdjustments max number of weighted sampling to do prior to defaulting to * @param maxWeightAdjustments max number of weighted sampling to do prior to defaulting to
* uniform. Set so that we avoid infinite loops (e.g. if weights are * uniform. Set so that we avoid infinite loops. (e.g. if weights are
* 0) * 0)
*/ */
case class WeightedBlenderQuery( case class WeightedBlenderQuery(

View File

@ -18,22 +18,22 @@ case class SourceTypeBackFillBlender @Inject() (globalStats: StatsReceiver) {
private val stats: StatsReceiver = globalStats.scope(name) private val stats: StatsReceiver = globalStats.scope(name)
/** /**
* Partition the candidates based on source type * Partition the candidates based on source type.
* Interleave the two partitions of candidates separately * Interleave the two partitions of candidates separately
* Then append the back fill candidates to the end * and then append the back fill candidates to the end.
*/ */
def blend( def blend(
params: Params, params: Params,
inputCandidates: Seq[Seq[InitialCandidate]], inputCandidates: Seq[Seq[InitialCandidate]],
): Future[Seq[BlendedCandidate]] = { ): Future[Seq[BlendedCandidate]] = {
// Filter out empty candidate sequence // Filter out empty candidate sequence.
val candidates = inputCandidates.filter(_.nonEmpty) val candidates = inputCandidates.filter(_.nonEmpty)
val backFillSourceTypes = val backFillSourceTypes =
if (params(BlenderParams.SourceTypeBackFillEnableVideoBackFill)) BackFillSourceTypesWithVideo if (params(BlenderParams.SourceTypeBackFillEnableVideoBackFill)) BackFillSourceTypesWithVideo
else BackFillSourceTypes else BackFillSourceTypes
// partition candidates based on their source types // Partition candidates based on their source types.
val (backFillCandidates, regularCandidates) = val (backFillCandidates, regularCandidates) =
candidates.partition( candidates.partition(
_.head.candidateGenerationInfo.sourceInfoOpt _.head.candidateGenerationInfo.sourceInfoOpt
@ -43,7 +43,7 @@ case class SourceTypeBackFillBlender @Inject() (globalStats: StatsReceiver) {
val interleavedBackFillCandidates = val interleavedBackFillCandidates =
InterleaveUtil.interleave(backFillCandidates) InterleaveUtil.interleave(backFillCandidates)
stats.stat("backFillCandidates").add(interleavedBackFillCandidates.size) stats.stat("backFillCandidates").add(interleavedBackFillCandidates.size)
// Append interleaved backfill candidates to the end // Append interleaved backfill candidates to the end.
val interleavedCandidates = interleavedRegularCandidates ++ interleavedBackFillCandidates val interleavedCandidates = interleavedRegularCandidates ++ interleavedBackFillCandidates
stats.stat("candidates").add(interleavedCandidates.size) stats.stat("candidates").add(interleavedCandidates.size)
@ -51,7 +51,6 @@ case class SourceTypeBackFillBlender @Inject() (globalStats: StatsReceiver) {
val blendedCandidates = BlendedCandidatesBuilder.build(inputCandidates, interleavedCandidates) val blendedCandidates = BlendedCandidatesBuilder.build(inputCandidates, interleavedCandidates)
Future.value(blendedCandidates) Future.value(blendedCandidates)
} }
} }
object ImplicitSignalBackFillBlender { object ImplicitSignalBackFillBlender {

View File

@ -27,7 +27,7 @@ case class SwitchBlender @Inject() (
userState: UserState, userState: UserState,
inputCandidates: Seq[Seq[InitialCandidate]], inputCandidates: Seq[Seq[InitialCandidate]],
): Future[Seq[BlendedCandidate]] = { ): Future[Seq[BlendedCandidate]] = {
// Take out empty seq // Take out the empty seq.
val nonEmptyCandidates = inputCandidates.collect { val nonEmptyCandidates = inputCandidates.collect {
case candidates if candidates.nonEmpty => case candidates if candidates.nonEmpty =>
candidates candidates
@ -43,7 +43,7 @@ case class SwitchBlender @Inject() (
} }
val candidatesToBlend = nonEmptyCandidates.sortBy(_.head)(innerSignalSorting) val candidatesToBlend = nonEmptyCandidates.sortBy(_.head)(innerSignalSorting)
// Blend based on specified blender rules // Blend based on specified blender rules.
params(BlenderParams.BlendingAlgorithmParam) match { params(BlenderParams.BlendingAlgorithmParam) match {
case BlendingAlgorithmEnum.RoundRobin => case BlendingAlgorithmEnum.RoundRobin =>
defaultBlender.blend(candidatesToBlend) defaultBlender.blend(candidatesToBlend)

View File

@ -39,24 +39,25 @@ class AdsCandidateGenerator @Inject() (
def get(query: AdsCandidateGeneratorQuery): Future[Seq[RankedAdsCandidate]] = { def get(query: AdsCandidateGeneratorQuery): Future[Seq[RankedAdsCandidate]] = {
val allStats = stats.scope("all") val allStats = stats.scope("all")
val perProductStats = stats.scope("perProduct", query.product.toString) val perProductStats = stats.scope("perProduct", query.product.toString)
StatsUtil.trackItemsStats(allStats) { StatsUtil.trackItemsStats(allStats) {
StatsUtil.trackItemsStats(perProductStats) { StatsUtil.trackItemsStats(perProductStats) {
for { for {
// fetch source signals // Fetch source signals.
sourceSignals <- StatsUtil.trackBlockStats(fetchSourcesStats) { sourceSignals <- StatsUtil.trackBlockStats(fetchSourcesStats) {
fetchSources(query) fetchSources(query)
} }
realGraphSeeds <- StatsUtil.trackItemMapStats(fetchRealGraphSeedsStats) { realGraphSeeds <- StatsUtil.trackItemMapStats(fetchRealGraphSeedsStats) {
fetchSeeds(query) fetchSeeds(query)
} }
// get initial candidates from similarity engines
// hydrate lineItemInfo and filter out non active ads // Get initial candidates from similarity engines.
// Hydrate lineItemInfo and filter out non active ads.
initialCandidates <- StatsUtil.trackBlockStats(fetchCandidatesStats) { initialCandidates <- StatsUtil.trackBlockStats(fetchCandidatesStats) {
fetchCandidates(query, sourceSignals, realGraphSeeds) fetchCandidates(query, sourceSignals, realGraphSeeds)
} }
// blend candidates // Blend candidates.
blendedCandidates <- StatsUtil.trackItemsStats(interleaveStats) { blendedCandidates <- StatsUtil.trackItemsStats(interleaveStats) {
interleave(initialCandidates) interleave(initialCandidates)
} }
@ -73,7 +74,6 @@ class AdsCandidateGenerator @Inject() (
} }
} }
} }
} }
def fetchSources( def fetchSources(
@ -95,7 +95,6 @@ class AdsCandidateGenerator @Inject() (
.fetchCandidates(query.userId, sourceSignals, realGraphSeeds, query.params), .fetchCandidates(query.userId, sourceSignals, realGraphSeeds, query.params),
query.params(AdsParams.EnableScribe) query.params(AdsParams.EnableScribe)
) )
} }
private def fetchSeeds( private def fetchSeeds(
@ -121,7 +120,6 @@ class AdsCandidateGenerator @Inject() (
scoreBoostFactor: Double, scoreBoostFactor: Double,
statsReceiver: StatsReceiver, statsReceiver: StatsReceiver,
): Future[Seq[RankedAdsCandidate]] = { ): Future[Seq[RankedAdsCandidate]] = {
val candidateSize = candidates.size val candidateSize = candidates.size
val rankedCandidates = candidates.zipWithIndex.map { val rankedCandidates = candidates.zipWithIndex.map {
case (candidate, index) => case (candidate, index) =>

View File

@ -90,7 +90,6 @@ case class AdsCandidateSourcesRouter @Inject() (
realGraphSeeds: Map[UserId, Double], realGraphSeeds: Map[UserId, Double],
params: configapi.Params params: configapi.Params
): Future[Seq[Seq[InitialAdsCandidate]]] = { ): Future[Seq[Seq[InitialAdsCandidate]]] = {
val simClustersANN1ConfigId = params(SimClustersANNParams.SimClustersANN1ConfigId) val simClustersANN1ConfigId = params(SimClustersANNParams.SimClustersANN1ConfigId)
val tweetBasedSANNMinScore = params( val tweetBasedSANNMinScore = params(
@ -296,7 +295,7 @@ case class AdsCandidateSourcesRouter @Inject() (
params params
) )
// dark traffic to simclusters-ann-2 // Dark traffic to simclusters-ann-2
if (decider.isAvailable(DeciderConstants.enableSimClustersANN2DarkTrafficDeciderKey)) { if (decider.isAvailable(DeciderConstants.enableSimClustersANN2DarkTrafficDeciderKey)) {
val simClustersANN2ConfigId = params(SimClustersANNParams.SimClustersANN2ConfigId) val simClustersANN2ConfigId = params(SimClustersANNParams.SimClustersANN2ConfigId)
val sann2Query = SimClustersANNSimilarityEngine.fromParams( val sann2Query = SimClustersANNSimilarityEngine.fromParams(
@ -329,7 +328,6 @@ case class AdsCandidateSourcesRouter @Inject() (
sourceInfo: Option[SourceInfo], sourceInfo: Option[SourceInfo],
params: configapi.Params params: configapi.Params
) = { ) = {
val query = ProducerBasedUserAdGraphSimilarityEngine.fromParams( val query = ProducerBasedUserAdGraphSimilarityEngine.fromParams(
sourceInfo.get.internalId, sourceInfo.get.internalId,
params params
@ -352,7 +350,6 @@ case class AdsCandidateSourcesRouter @Inject() (
sourceInfo: Option[SourceInfo], sourceInfo: Option[SourceInfo],
params: configapi.Params params: configapi.Params
) = { ) = {
val query = TweetBasedUserAdGraphSimilarityEngine.fromParams( val query = TweetBasedUserAdGraphSimilarityEngine.fromParams(
sourceInfo.get.internalId, sourceInfo.get.internalId,
params params
@ -375,7 +372,6 @@ case class AdsCandidateSourcesRouter @Inject() (
realGraphSeeds: Map[UserId, Double], realGraphSeeds: Map[UserId, Double],
params: configapi.Params params: configapi.Params
) = { ) = {
val query = ConsumersBasedUserAdGraphSimilarityEngine val query = ConsumersBasedUserAdGraphSimilarityEngine
.fromParams(realGraphSeeds, params) .fromParams(realGraphSeeds, params)
@ -394,7 +390,7 @@ case class AdsCandidateSourcesRouter @Inject() (
CandidateGenerationInfo( CandidateGenerationInfo(
Some(sourceInfo), Some(sourceInfo),
similarityEngineInfo, similarityEngineInfo,
Seq.empty // Atomic Similarity Engine. Hence it has no contributing SEs Seq.empty // Atomic Similarity Engine. Hence it has no contributing SEs.
) )
) )
}) })
@ -404,7 +400,7 @@ case class AdsCandidateSourcesRouter @Inject() (
similarityEngine: HnswANNSimilarityEngine, similarityEngine: HnswANNSimilarityEngine,
similarityEngineType: SimilarityEngineType, similarityEngineType: SimilarityEngineType,
requestUserId: UserId, requestUserId: UserId,
sourceInfo: Option[SourceInfo], // if none, then it's consumer-based similarity engine sourceInfo: Option[SourceInfo], // If none, then it's consumer-based similarity engine.
model: String model: String
): Future[Seq[TweetWithCandidateGenerationInfo]] = { ): Future[Seq[TweetWithCandidateGenerationInfo]] = {
val internalId = val internalId =
@ -455,7 +451,7 @@ case class AdsCandidateSourcesRouter @Inject() (
CandidateGenerationInfo( CandidateGenerationInfo(
None, None,
similarityEngineInfo, similarityEngineInfo,
Seq.empty // Atomic Similarity Engine. Hence it has no contributing SEs Seq.empty // Atomic Similarity Engine. Hence it has no contributing SEs.
) )
) )
} }

View File

@ -96,7 +96,6 @@ case class CandidateSourcesRouter @Inject() (
sourceGraphs: Map[String, Option[GraphSourceInfo]], sourceGraphs: Map[String, Option[GraphSourceInfo]],
params: configapi.Params, params: configapi.Params,
): Future[Seq[Seq[InitialCandidate]]] = { ): Future[Seq[Seq[InitialCandidate]]] = {
val tweetBasedCandidatesFuture = getCandidates( val tweetBasedCandidatesFuture = getCandidates(
getTweetBasedSourceInfo(sourceSignals), getTweetBasedSourceInfo(sourceSignals),
params, params,
@ -225,7 +224,7 @@ case class CandidateSourcesRouter @Inject() (
consumersBasedUvgRealGraphInCandidatesFuture, consumersBasedUvgRealGraphInCandidatesFuture,
customizedRetrievalBasedCandidatesFuture customizedRetrievalBasedCandidatesFuture
)).map { candidatesList => )).map { candidatesList =>
// remove empty innerSeq // Remove empty innerSeq.
val result = candidatesList.flatten.filter(_.nonEmpty) val result = candidatesList.flatten.filter(_.nonEmpty)
stats.stat("numOfSequences").add(result.size) stats.stat("numOfSequences").add(result.size)
stats.stat("flattenCandidatesWithDup").add(result.flatten.size) stats.stat("flattenCandidatesWithDup").add(result.flatten.size)
@ -262,7 +261,7 @@ case class CandidateSourcesRouter @Inject() (
CandidateGenerationInfo( CandidateGenerationInfo(
sourceInfo, sourceInfo,
similarityEngineInfo, similarityEngineInfo,
Seq.empty // Atomic Similarity Engine. Hence it has no contributing SEs Seq.empty // Atomic Similarity Engine. Hence it has no contributing SEs.
) )
) )
} }
@ -330,7 +329,7 @@ case class CandidateSourcesRouter @Inject() (
CandidateGenerationInfo( CandidateGenerationInfo(
None, None,
similarityEngineInfo, similarityEngineInfo,
Seq.empty // Atomic Similarity Engine. Hence it has no contributing SEs Seq.empty // Atomic Similarity Engine. Hence it has no contributing SEs.
) )
) )
} }
@ -358,7 +357,7 @@ case class CandidateSourcesRouter @Inject() (
engine.getCandidates(EngineQuery(query, params)).map { engine.getCandidates(EngineQuery(query, params)).map {
_.map { _.map {
_.map { tweetWithScore => _.map { tweetWithScore =>
// define filters // Define filters.
TweetWithCandidateGenerationInfo( TweetWithCandidateGenerationInfo(
tweetWithScore.tweetId, tweetWithScore.tweetId,
CandidateGenerationInfo( CandidateGenerationInfo(
@ -401,7 +400,7 @@ case class CandidateSourcesRouter @Inject() (
CandidateGenerationInfo( CandidateGenerationInfo(
None, None,
similarityEngineInfo, similarityEngineInfo,
Seq.empty // Atomic Similarity Engine. Hence it has no contributing SEs Seq.empty // Atomic Similarity Engine. Hence it has no contributing SEs.
) )
) )
} }

View File

@ -48,7 +48,6 @@ class CrCandidateGenerator @Inject() (
timeoutConfig: TimeoutConfig, timeoutConfig: TimeoutConfig,
globalStats: StatsReceiver) { globalStats: StatsReceiver) {
private val timer: Timer = new JavaTimer(true) private val timer: Timer = new JavaTimer(true)
private val stats: StatsReceiver = globalStats.scope(this.getClass.getCanonicalName) private val stats: StatsReceiver = globalStats.scope(this.getClass.getCanonicalName)
private val fetchSourcesStats = stats.scope("fetchSources") private val fetchSourcesStats = stats.scope("fetchSources")
@ -78,14 +77,14 @@ class CrCandidateGenerator @Inject() (
fetchSources(query) fetchSources(query)
} }
initialCandidates <- StatsUtil.trackBlockStats(fetchCandidatesAfterFilterStats) { initialCandidates <- StatsUtil.trackBlockStats(fetchCandidatesAfterFilterStats) {
// find the positive and negative signals // Find the positive and negative signals.
val (positiveSignals, negativeSignals) = sourceSignals.partition { signal => val (positiveSignals, negativeSignals) = sourceSignals.partition { signal =>
!EnabledNegativeSourceTypes.contains(signal.sourceType) !EnabledNegativeSourceTypes.contains(signal.sourceType)
} }
fetchPositiveSourcesStats.stat("size").add(positiveSignals.size) fetchPositiveSourcesStats.stat("size").add(positiveSignals.size)
fetchNegativeSourcesStats.stat("size").add(negativeSignals.size) fetchNegativeSourcesStats.stat("size").add(negativeSignals.size)
// find the positive signals to keep, removing block and muted users // Find the positive signals to keep, removing block and muted users.
val filteredSourceInfo = val filteredSourceInfo =
if (negativeSignals.nonEmpty && query.params( if (negativeSignals.nonEmpty && query.params(
RecentNegativeSignalParams.EnableSourceParam)) { RecentNegativeSignalParams.EnableSourceParam)) {
@ -94,7 +93,7 @@ class CrCandidateGenerator @Inject() (
positiveSignals positiveSignals
} }
// fetch candidates from the positive signals // Fetch candidates from the positive signals.
StatsUtil.trackBlockStats(fetchCandidatesStats) { StatsUtil.trackBlockStats(fetchCandidatesStats) {
fetchCandidates(query, filteredSourceInfo, sourceGraphsMap) fetchCandidates(query, filteredSourceInfo, sourceGraphsMap)
} }

View File

@ -28,7 +28,7 @@ import javax.inject.Singleton
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
/** /**
* A candidate generator that fetches similar tweets from multiple customized retrieval based candidate sources * A candidate generator that fetches similar tweets from multiple customized retrieval based candidate sources.
* *
* Different from [[TweetBasedCandidateGeneration]], this store returns candidates from different * Different from [[TweetBasedCandidateGeneration]], this store returns candidates from different
* similarity engines without blending. In other words, this class shall not be thought of as a * similarity engines without blending. In other words, this class shall not be thought of as a

View File

@ -105,7 +105,7 @@ class FrsTweetCandidateGenerator @Inject() (
} }
/** /**
* Fetch recommended seed users from FRS * Fetch recommended seed users from FRS.
*/ */
private def fetchSeeds( private def fetchSeeds(
userId: UserId, userId: UserId,
@ -131,7 +131,7 @@ class FrsTweetCandidateGenerator @Inject() (
} }
/** /**
* Fetch tweet candidates from Earlybird * Fetch tweet candidates from Earlybird.
*/ */
private def fetchCandidates( private def fetchCandidates(
searcherUserId: UserId, searcherUserId: UserId,
@ -141,7 +141,7 @@ class FrsTweetCandidateGenerator @Inject() (
params: Params params: Params
): Future[Option[Seq[TweetWithAuthor]]] = { ): Future[Option[Seq[TweetWithAuthor]]] = {
if (seedAuthors.nonEmpty) { if (seedAuthors.nonEmpty) {
// call earlybird // Call Earlybird.
val query = EarlybirdSimilarityEngineRouter.queryFromParams( val query = EarlybirdSimilarityEngineRouter.queryFromParams(
Some(searcherUserId), Some(searcherUserId),
seedAuthors, seedAuthors,
@ -154,7 +154,7 @@ class FrsTweetCandidateGenerator @Inject() (
} }
/** /**
* Filter candidates that do not pass visibility filter policy * Filter candidates that do not pass visibility filter policy.
*/ */
private def filterCandidates( private def filterCandidates(
candidates: Option[Seq[TweetWithAuthor]], candidates: Option[Seq[TweetWithAuthor]],
@ -175,7 +175,7 @@ class FrsTweetCandidateGenerator @Inject() (
} }
/** /**
* Hydrate the candidates with the FRS candidate sources and scores * Hydrate the candidates with the FRS candidate sources and scores.
*/ */
private def hydrateCandidates( private def hydrateCandidates(
frsAuthorWithScores: Option[Map[UserId, FrsQueryResult]], frsAuthorWithScores: Option[Map[UserId, FrsQueryResult]],
@ -193,8 +193,8 @@ class FrsTweetCandidateGenerator @Inject() (
frsCandidateSourceScores = frsQueryResult.flatMap { result => frsCandidateSourceScores = frsQueryResult.flatMap { result =>
result.sourceWithScores.map { result.sourceWithScores.map {
_.collect { _.collect {
// see TokenStrToAlgorithmMap @ https://sourcegraph.twitter.biz/git.twitter.biz/source/-/blob/hermit/hermit-core/src/main/scala/com/twitter/hermit/constants/AlgorithmFeedbackTokens.scala // See TokenStrToAlgorithmMap @ https://sourcegraph.twitter.biz/git.twitter.biz/source/-/blob/hermit/hermit-core/src/main/scala/com/twitter/hermit/constants/AlgorithmFeedbackTokens.scala
// see Algorithm @ https://sourcegraph.twitter.biz/git.twitter.biz/source/-/blob/hermit/hermit-core/src/main/scala/com/twitter/hermit/model/Algorithm.scala // See Algorithm @ https://sourcegraph.twitter.biz/git.twitter.biz/source/-/blob/hermit/hermit-core/src/main/scala/com/twitter/hermit/model/Algorithm.scala
case (candidateSourceAlgoStr, score) case (candidateSourceAlgoStr, score)
if AlgorithmFeedbackTokens.TokenStrToAlgorithmMap.contains( if AlgorithmFeedbackTokens.TokenStrToAlgorithmMap.contains(
candidateSourceAlgoStr) => candidateSourceAlgoStr) =>
@ -210,7 +210,6 @@ class FrsTweetCandidateGenerator @Inject() (
} }
} }
} }
} }
object FrsTweetCandidateGenerator { object FrsTweetCandidateGenerator {

View File

@ -43,7 +43,6 @@ class RelatedTweetCandidateGenerator @Inject() (
def get( def get(
query: RelatedTweetCandidateGeneratorQuery query: RelatedTweetCandidateGeneratorQuery
): Future[Seq[InitialCandidate]] = { ): Future[Seq[InitialCandidate]] = {
val allStats = stats.scope("all") val allStats = stats.scope("all")
val perProductStats = stats.scope("perProduct", query.product.toString) val perProductStats = stats.scope("perProduct", query.product.toString)
StatsUtil.trackItemsStats(allStats) { StatsUtil.trackItemsStats(allStats) {
@ -90,9 +89,9 @@ class RelatedTweetCandidateGenerator @Inject() (
} }
/*** /***
* fetch Candidates from TweetBased/ProducerBased Unified Similarity Engine, * Fetch Candidates from TweetBased/ProducerBased Unified Similarity Engine,
* and apply VF filter based on TweetInfoStore * and apply VF filter based on TweetInfoStore.
* To align with the downstream processing (filter, rank), we tend to return a Seq[Seq[InitialCandidate]] * To align with the downstream processing (filter, rank). We tend to return a Seq[Seq[InitialCandidate]]
* instead of a Seq[Candidate] even though we only have a Seq in it. * instead of a Seq[Candidate] even though we only have a Seq in it.
*/ */
private def getCandidatesFromSimilarityEngine[QueryType]( private def getCandidatesFromSimilarityEngine[QueryType](
@ -103,7 +102,7 @@ class RelatedTweetCandidateGenerator @Inject() (
/*** /***
* We wrap the query to be a Seq of queries for the Sim Engine to ensure evolvability of candidate generation * We wrap the query to be a Seq of queries for the Sim Engine to ensure evolvability of candidate generation
* and as a result, it will return Seq[Seq[InitialCandidate]] * and as a result, it will return Seq[Seq[InitialCandidate]].
*/ */
val engineQueries = val engineQueries =
Seq(fromParamsForRelatedTweet(query.internalId, query.params)) Seq(fromParamsForRelatedTweet(query.internalId, query.params))
@ -138,7 +137,7 @@ class RelatedTweetCandidateGenerator @Inject() (
Future.collect(tweetInfoStore.multiGet(tweetIds)).map { tweetInfos => Future.collect(tweetInfoStore.multiGet(tweetIds)).map { tweetInfos =>
/*** /***
* If tweetInfo does not exist, we will filter out this tweet candidate. * If tweetInfo does not exist, we will filter out this tweet candidate.
* This tweetInfo filter also acts as the VF filter * This tweetInfo filter also acts as the VF filter.
*/ */
candidates.collect { candidates.collect {
case candidate if tweetInfos.getOrElse(candidate.tweetId, None).isDefined => case candidate if tweetInfos.getOrElse(candidate.tweetId, None).isDefined =>

View File

@ -36,7 +36,6 @@ class RelatedVideoTweetCandidateGenerator @Inject() (
def get( def get(
query: RelatedVideoTweetCandidateGeneratorQuery query: RelatedVideoTweetCandidateGeneratorQuery
): Future[Seq[InitialCandidate]] = { ): Future[Seq[InitialCandidate]] = {
val allStats = stats.scope("all") val allStats = stats.scope("all")
val perProductStats = stats.scope("perProduct", query.product.toString) val perProductStats = stats.scope("perProduct", query.product.toString)
StatsUtil.trackItemsStats(allStats) { StatsUtil.trackItemsStats(allStats) {
@ -75,8 +74,8 @@ class RelatedVideoTweetCandidateGenerator @Inject() (
} }
/*** /***
* fetch Candidates from TweetBased/ProducerBased Unified Similarity Engine, * Fetch Candidates from TweetBased/ProducerBased Unified Similarity Engine,
* and apply VF filter based on TweetInfoStore * and apply VF filter based on TweetInfoStore.
* To align with the downstream processing (filter, rank), we tend to return a Seq[Seq[InitialCandidate]] * To align with the downstream processing (filter, rank), we tend to return a Seq[Seq[InitialCandidate]]
* instead of a Seq[Candidate] even though we only have a Seq in it. * instead of a Seq[Candidate] even though we only have a Seq in it.
*/ */
@ -88,7 +87,7 @@ class RelatedVideoTweetCandidateGenerator @Inject() (
/*** /***
* We wrap the query to be a Seq of queries for the Sim Engine to ensure evolvability of candidate generation * We wrap the query to be a Seq of queries for the Sim Engine to ensure evolvability of candidate generation
* and as a result, it will return Seq[Seq[InitialCandidate]] * and as a result, it will return Seq[Seq[InitialCandidate]].
*/ */
val engineQueries = val engineQueries =
Seq(fromParamsForRelatedVideoTweet(query.internalId, query.params)) Seq(fromParamsForRelatedVideoTweet(query.internalId, query.params))
@ -121,7 +120,7 @@ class RelatedVideoTweetCandidateGenerator @Inject() (
Future.collect(tweetInfoStore.multiGet(tweetIds)).map { tweetInfos => Future.collect(tweetInfoStore.multiGet(tweetIds)).map { tweetInfos =>
/*** /***
* If tweetInfo does not exist, we will filter out this tweet candidate. * If tweetInfo does not exist, we will filter out this tweet candidate.
* This tweetInfo filter also acts as the VF filter * This tweetInfo filter also acts as the VF filter.
*/ */
candidates.collect { candidates.collect {
case candidate if tweetInfos.getOrElse(candidate.tweetId, None).isDefined => case candidate if tweetInfos.getOrElse(candidate.tweetId, None).isDefined =>

View File

@ -179,7 +179,7 @@ case class SimClustersInterestedInCandidateGeneration @Inject() (
else else
Future.None Future.None
// AddressBookInterestedIn Queries // AddressBookInterestedIn Queries.
val userAddressBookInterestedInCandidateResultFut = val userAddressBookInterestedInCandidateResultFut =
if (query.enableAddressBookNextInterestedIn && query.enableProdSimClustersANNSimilarityEngine) if (query.enableAddressBookNextInterestedIn && query.enableProdSimClustersANNSimilarityEngine)
getInterestedInCandidateResult( getInterestedInCandidateResult(
@ -397,7 +397,7 @@ object SimClustersInterestedInCandidateGeneration {
internalId: InternalId, internalId: InternalId,
params: configapi.Params, params: configapi.Params,
): Query = { ): Query = {
// SimClusters common configs // SimClusters common configs.
val simClustersModelVersion = val simClustersModelVersion =
ModelVersions.Enum.enumToSimClustersModelVersionMap(params(GlobalParams.ModelVersionParam)) ModelVersions.Enum.enumToSimClustersModelVersionMap(params(GlobalParams.ModelVersionParam))
val simClustersANNConfigId = params(SimClustersANNParams.SimClustersANNConfigId) val simClustersANNConfigId = params(SimClustersANNParams.SimClustersANNConfigId)
@ -415,13 +415,13 @@ object SimClustersInterestedInCandidateGeneration {
val simClustersAddressBookInterestedInMinScore = params( val simClustersAddressBookInterestedInMinScore = params(
InterestedInParams.MinScoreAddressBookParam) InterestedInParams.MinScoreAddressBookParam)
// InterestedIn embeddings parameters // InterestedIn embeddings parameters.
val interestedInEmbedding = params(InterestedInParams.InterestedInEmbeddingIdParam) val interestedInEmbedding = params(InterestedInParams.InterestedInEmbeddingIdParam)
val nextInterestedInEmbedding = params(InterestedInParams.NextInterestedInEmbeddingIdParam) val nextInterestedInEmbedding = params(InterestedInParams.NextInterestedInEmbeddingIdParam)
val addressbookInterestedInEmbedding = params( val addressbookInterestedInEmbedding = params(
InterestedInParams.AddressBookInterestedInEmbeddingIdParam) InterestedInParams.AddressBookInterestedInEmbeddingIdParam)
// Prod SimClustersANN Query // Prod SimClustersANN Query.
val interestedInSimClustersANNQuery = val interestedInSimClustersANNQuery =
SimClustersANNSimilarityEngine.fromParams( SimClustersANNSimilarityEngine.fromParams(
internalId, internalId,
@ -446,7 +446,7 @@ object SimClustersInterestedInCandidateGeneration {
simClustersANNConfigId, simClustersANNConfigId,
params) params)
// Experimental SANN cluster Query // Experimental SANN cluster Query.
val interestedInExperimentalSimClustersANNQuery = val interestedInExperimentalSimClustersANNQuery =
SimClustersANNSimilarityEngine.fromParams( SimClustersANNSimilarityEngine.fromParams(
internalId, internalId,
@ -471,7 +471,7 @@ object SimClustersInterestedInCandidateGeneration {
experimentalSimClustersANNConfigId, experimentalSimClustersANNConfigId,
params) params)
// SimClusters ANN cluster 1 Query // SimClusters ANN cluster 1 Query.
val interestedInSimClustersANN1Query = val interestedInSimClustersANN1Query =
SimClustersANNSimilarityEngine.fromParams( SimClustersANNSimilarityEngine.fromParams(
internalId, internalId,
@ -496,7 +496,7 @@ object SimClustersInterestedInCandidateGeneration {
simClustersANN1ConfigId, simClustersANN1ConfigId,
params) params)
// SimClusters ANN cluster 2 Query // SimClusters ANN cluster 2 Query.
val interestedInSimClustersANN2Query = val interestedInSimClustersANN2Query =
SimClustersANNSimilarityEngine.fromParams( SimClustersANNSimilarityEngine.fromParams(
internalId, internalId,
@ -521,7 +521,7 @@ object SimClustersInterestedInCandidateGeneration {
simClustersANN2ConfigId, simClustersANN2ConfigId,
params) params)
// SimClusters ANN cluster 3 Query // SimClusters ANN cluster 3 Query.
val interestedInSimClustersANN3Query = val interestedInSimClustersANN3Query =
SimClustersANNSimilarityEngine.fromParams( SimClustersANNSimilarityEngine.fromParams(
internalId, internalId,
@ -546,7 +546,7 @@ object SimClustersInterestedInCandidateGeneration {
simClustersANN3ConfigId, simClustersANN3ConfigId,
params) params)
// SimClusters ANN cluster 5 Query // SimClusters ANN cluster 5 Query.
val interestedInSimClustersANN5Query = val interestedInSimClustersANN5Query =
SimClustersANNSimilarityEngine.fromParams( SimClustersANNSimilarityEngine.fromParams(
internalId, internalId,
@ -554,7 +554,8 @@ object SimClustersInterestedInCandidateGeneration {
simClustersModelVersion, simClustersModelVersion,
simClustersANN5ConfigId, simClustersANN5ConfigId,
params) params)
// SimClusters ANN cluster 4 Query
// SimClusters ANN cluster 4 Query.
val interestedInSimClustersANN4Query = val interestedInSimClustersANN4Query =
SimClustersANNSimilarityEngine.fromParams( SimClustersANNSimilarityEngine.fromParams(
internalId, internalId,

View File

@ -116,7 +116,7 @@ class TopicTweetCandidateGenerator @Inject() (
val tweetIds = candidates.map(_.tweetId).toSet val tweetIds = candidates.map(_.tweetId).toSet
val numTweetsPreFilter = tweetIds.size val numTweetsPreFilter = tweetIds.size
Future.collect(tweetInfoStore.multiGet(tweetIds)).map { tweetInfos => Future.collect(tweetInfoStore.multiGet(tweetIds)).map { tweetInfos =>
/** * /**
* If tweetInfo does not exist, we will filter out this tweet candidate. * If tweetInfo does not exist, we will filter out this tweet candidate.
*/ */
val tweetyPieFilteredInitialCandidates = candidates.collect { val tweetyPieFilteredInitialCandidates = candidates.collect {
@ -142,7 +142,6 @@ class TopicTweetCandidateGenerator @Inject() (
topicId -> tweetyPieFilteredInitialCandidates topicId -> tweetyPieFilteredInitialCandidates
} }
} }
Future.collect(initialCandidates.toSeq).map(_.toMap) Future.collect(initialCandidates.toSeq).map(_.toMap)
} }
@ -152,7 +151,6 @@ class TopicTweetCandidateGenerator @Inject() (
isVideoOnly: Boolean, isVideoOnly: Boolean,
excludeTweetIds: Set[TweetId] excludeTweetIds: Set[TweetId]
): Future[Map[TopicId, Seq[InitialCandidate]]] = { ): Future[Map[TopicId, Seq[InitialCandidate]]] = {
val earliestTweetId = SnowflakeId.firstIdFor(Time.now - maxTweetAge) val earliestTweetId = SnowflakeId.firstIdFor(Time.now - maxTweetAge)
val filteredResults = topicTweetMap.map { val filteredResults = topicTweetMap.map {

View File

@ -46,18 +46,16 @@ class UtegTweetCandidateGenerator @Inject() (
def get( def get(
query: UtegTweetCandidateGeneratorQuery query: UtegTweetCandidateGeneratorQuery
): Future[Seq[TweetWithScoreAndSocialProof]] = { ): Future[Seq[TweetWithScoreAndSocialProof]] = {
val allStats = stats.scope("all") val allStats = stats.scope("all")
val perProductStats = stats.scope("perProduct", query.product.toString) val perProductStats = stats.scope("perProduct", query.product.toString)
StatsUtil.trackItemsStats(allStats) { StatsUtil.trackItemsStats(allStats) {
StatsUtil.trackItemsStats(perProductStats) { StatsUtil.trackItemsStats(perProductStats) {
/** /**
* The candidate we return in the end needs a social proof field, which isn't * The candidate we return in the end needs a social proof field, which isn't
* supported by the any existing Candidate type, so we created TweetWithScoreAndSocialProof * supported by the any existing Candidate type, so we created TweetWithScoreAndSocialProof
* instead. * instead.
* *
* However, filters and light ranker expect Candidate-typed param to work. In order to minimise the * However, filters and light ranker expect Candidate-typed param to work. In order to minimize the
* changes to them, we are doing conversions from/to TweetWithScoreAndSocialProof to/from Candidate * changes to them, we are doing conversions from/to TweetWithScoreAndSocialProof to/from Candidate
* in this method. * in this method.
*/ */
@ -111,7 +109,6 @@ class UtegTweetCandidateGenerator @Inject() (
candidate.toRankedCandidate(score) candidate.toRankedCandidate(score)
} }
) )
} }
def fetchCandidates( def fetchCandidates(
@ -136,7 +133,7 @@ class UtegTweetCandidateGenerator @Inject() (
): Future[Seq[InitialCandidate]] = { ): Future[Seq[InitialCandidate]] = {
val tweetIds = candidates.map(_.tweetId).toSet val tweetIds = candidates.map(_.tweetId).toSet
Future.collect(tweetInfoStore.multiGet(tweetIds)).map { tweetInfos => Future.collect(tweetInfoStore.multiGet(tweetIds)).map { tweetInfos =>
/** * /**
* If tweetInfo does not exist, we will filter out this tweet candidate. * If tweetInfo does not exist, we will filter out this tweet candidate.
*/ */
candidates.collect { candidates.collect {
@ -172,7 +169,7 @@ class UtegTweetCandidateGenerator @Inject() (
candidate.predictionScore, candidate.predictionScore,
tweet.socialProofByType tweet.socialProofByType
) )
// The exception should never be thrown // The exception should never be thrown.
}.getOrElse(throw new Exception("Cannot find ranked candidate in original UTEG tweets")) }.getOrElse(throw new Exception("Cannot find ranked candidate in original UTEG tweets"))
} }
} }

View File

@ -41,12 +41,11 @@ object SimClustersANNConfig {
annAlgorithm = ScoringAlgorithm.CosineSimilarity, annAlgorithm = ScoringAlgorithm.CosineSimilarity,
) )
/* /**
SimClustersANNConfigId: String * SimClustersANNConfigId: String
Format: Prod - EmbeddingType_ModelVersion_Default * Format: Prod - EmbeddingType_ModelVersion_Default
Format: Experiment - EmbeddingType_ModelVersion_Date_Two-Digit-Serial-Number. Date : YYYYMMDD * Format: Experiment - EmbeddingType_ModelVersion_Date_Two-Digit-Serial-Number. Date : YYYYMMDD
*/ */
private val FavBasedProducer_Model20m145k2020_Default = DefaultConfig.copy() private val FavBasedProducer_Model20m145k2020_Default = DefaultConfig.copy()
// Chunnan's exp on maxTweetCandidateAgeDays 2 // Chunnan's exp on maxTweetCandidateAgeDays 2
@ -142,12 +141,14 @@ object SimClustersANNConfig {
candidateEmbeddingType = EmbeddingType.LogFavBasedRealTimeTweet, candidateEmbeddingType = EmbeddingType.LogFavBasedRealTimeTweet,
maxTweetCandidateAge = 1.hours maxTweetCandidateAge = 1.hours
) )
// SANN-4 config // SANN-4 config
private val LogFavLongestL2EmbeddingTweet_Model20m145k2020_20221220 = private val LogFavLongestL2EmbeddingTweet_Model20m145k2020_20221220 =
LogFavLongestL2EmbeddingTweet_Model20m145k2020_Default.copy( LogFavLongestL2EmbeddingTweet_Model20m145k2020_Default.copy(
candidateEmbeddingType = EmbeddingType.LogFavBasedEvergreenTweet, candidateEmbeddingType = EmbeddingType.LogFavBasedEvergreenTweet,
maxTweetCandidateAge = 48.hours maxTweetCandidateAge = 48.hours
) )
private val UnfilteredUserInterestedIn_Model20m145k2020_Default = DefaultConfig.copy() private val UnfilteredUserInterestedIn_Model20m145k2020_Default = DefaultConfig.copy()
// Chunnan's exp on maxTweetCandidateAgeDays 2 // Chunnan's exp on maxTweetCandidateAgeDays 2
@ -199,6 +200,7 @@ object SimClustersANNConfig {
candidateEmbeddingType = EmbeddingType.LogFavBasedEvergreenTweet, candidateEmbeddingType = EmbeddingType.LogFavBasedEvergreenTweet,
maxTweetCandidateAge = 48.hours maxTweetCandidateAge = 48.hours
) )
private val LogFavBasedUserInterestedInFromAPE_Model20m145k2020_Default = DefaultConfig.copy() private val LogFavBasedUserInterestedInFromAPE_Model20m145k2020_Default = DefaultConfig.copy()
// Chunnan's exp on maxTweetCandidateAgeDays 2 // Chunnan's exp on maxTweetCandidateAgeDays 2
@ -302,6 +304,7 @@ object SimClustersANNConfig {
candidateEmbeddingType = EmbeddingType.LogFavBasedEvergreenTweet, candidateEmbeddingType = EmbeddingType.LogFavBasedEvergreenTweet,
maxTweetCandidateAge = 48.hours maxTweetCandidateAge = 48.hours
) )
private val UserNextInterestedIn_Model20m145k2020_Default = DefaultConfig.copy() private val UserNextInterestedIn_Model20m145k2020_Default = DefaultConfig.copy()
// Chunnan's exp on maxTweetCandidateAgeDays 2 // Chunnan's exp on maxTweetCandidateAgeDays 2
@ -353,7 +356,8 @@ object SimClustersANNConfig {
candidateEmbeddingType = EmbeddingType.LogFavBasedEvergreenTweet, candidateEmbeddingType = EmbeddingType.LogFavBasedEvergreenTweet,
maxTweetCandidateAge = 48.hours maxTweetCandidateAge = 48.hours
) )
// Vincent's experiment on using FollowBasedProducer as query embedding type for UserFollow
// Vincent's experiment on using FollowBasedProducer as query embedding type for UserFollow.
private val FollowBasedProducer_Model20m145k2020_Default = private val FollowBasedProducer_Model20m145k2020_Default =
FavBasedProducer_Model20m145k2020_Default.copy() FavBasedProducer_Model20m145k2020_Default.copy()
@ -400,6 +404,7 @@ object SimClustersANNConfig {
candidateEmbeddingType = EmbeddingType.LogFavBasedEvergreenTweet, candidateEmbeddingType = EmbeddingType.LogFavBasedEvergreenTweet,
maxTweetCandidateAge = 48.hours maxTweetCandidateAge = 48.hours
) )
val DefaultConfigMappings: Map[String, SimClustersANNConfig] = Map( val DefaultConfigMappings: Map[String, SimClustersANNConfig] = Map(
"FavBasedProducer_Model20m145k2020_Default" -> FavBasedProducer_Model20m145k2020_Default, "FavBasedProducer_Model20m145k2020_Default" -> FavBasedProducer_Model20m145k2020_Default,
"FavBasedProducer_Model20m145k2020_20220617_06" -> FavBasedProducer_Model20m145k2020_20220617_06, "FavBasedProducer_Model20m145k2020_20220617_06" -> FavBasedProducer_Model20m145k2020_20220617_06,

View File

@ -3,22 +3,22 @@ package com.twitter.cr_mixer.config
import com.twitter.util.Duration import com.twitter.util.Duration
case class TimeoutConfig( case class TimeoutConfig(
/* Default timeouts for candidate generator */ // Default timeouts for candidate generator.
serviceTimeout: Duration, serviceTimeout: Duration,
signalFetchTimeout: Duration, signalFetchTimeout: Duration,
similarityEngineTimeout: Duration, similarityEngineTimeout: Duration,
annServiceClientTimeout: Duration, annServiceClientTimeout: Duration,
/* For Uteg Candidate Generator */ // For Uteg Candidate Generator.
utegSimilarityEngineTimeout: Duration, utegSimilarityEngineTimeout: Duration,
/* For User State Store */ // For User State Store
userStateUnderlyingStoreTimeout: Duration, userStateUnderlyingStoreTimeout: Duration,
userStateStoreTimeout: Duration, userStateStoreTimeout: Duration,
/* For FRS based tweets */ // For FRS based tweets .
// Timeout passed to EarlyBird server // Timeout passed to EarlyBird server.
earlybirdServerTimeout: Duration, earlybirdServerTimeout: Duration,
// Timeout set on CrMixer side // Timeout set on CrMixer side
earlybirdSimilarityEngineTimeout: Duration, earlybirdSimilarityEngineTimeout: Duration,
frsBasedTweetEndpointTimeout: Duration, frsBasedTweetEndpointTimeout: Duration,
topicTweetEndpointTimeout: Duration, topicTweetEndpointTimeout: Duration,
// Timeout Settings for Navi gRPC Client // Timeout Settings for Navi gRPC Client.
naviRequestTimeout: Duration) naviRequestTimeout: Duration)

View File

@ -101,7 +101,7 @@ class CrMixerThriftController @Inject() (
ExceptionUtils.getStackTrace(e) ExceptionUtils.getStackTrace(e)
).mkString("\n") ).mkString("\n")
/** * /**
* We chose logger.info() here to print message instead of logger.error since that * We chose logger.info() here to print message instead of logger.error since that
* logger.error sometimes suppresses detailed stacktrace. * logger.error sometimes suppresses detailed stacktrace.
*/ */
@ -109,8 +109,7 @@ class CrMixerThriftController @Inject() (
} }
private def generateRequestUUID(): Long = { private def generateRequestUUID(): Long = {
/**
/** *
* We generate unique UUID via bitwise operations. See the below link for more: * We generate unique UUID via bitwise operations. See the below link for more:
* https://stackoverflow.com/questions/15184820/how-to-generate-unique-positive-long-using-uuid * https://stackoverflow.com/questions/15184820/how-to-generate-unique-positive-long-using-uuid
*/ */
@ -119,7 +118,6 @@ class CrMixerThriftController @Inject() (
handle(t.CrMixer.GetTweetRecommendations) { args: t.CrMixer.GetTweetRecommendations.Args => handle(t.CrMixer.GetTweetRecommendations) { args: t.CrMixer.GetTweetRecommendations.Args =>
val endpointName = "getTweetRecommendations" val endpointName = "getTweetRecommendations"
val requestUUID = generateRequestUUID() val requestUUID = generateRequestUUID()
val startTime = Time.now.inMilliseconds val startTime = Time.now.inMilliseconds
val userId = args.request.clientContext.userId.getOrElse( val userId = args.request.clientContext.userId.getOrElse(
@ -168,10 +166,9 @@ class CrMixerThriftController @Inject() (
Future(CrMixerTweetResponse(Seq.empty)) Future(CrMixerTweetResponse(Seq.empty))
} }
} }
} }
/** * /**
* GetRelatedTweetsForQueryTweet and GetRelatedTweetsForQueryAuthor are essentially * GetRelatedTweetsForQueryTweet and GetRelatedTweetsForQueryAuthor are essentially
* doing very similar things, except that one passes in TweetId which calls TweetBased engine, * doing very similar things, except that one passes in TweetId which calls TweetBased engine,
* and the other passes in AuthorId which calls ProducerBased engine. * and the other passes in AuthorId which calls ProducerBased engine.
@ -221,7 +218,6 @@ class CrMixerThriftController @Inject() (
Future(RelatedTweetResponse(Seq.empty)) Future(RelatedTweetResponse(Seq.empty))
} }
} }
} }
private def getRelatedVideoTweets( private def getRelatedVideoTweets(
@ -330,7 +326,6 @@ class CrMixerThriftController @Inject() (
Future(AdsResponse(Seq.empty)) Future(AdsResponse(Seq.empty))
} }
} }
} }
private def buildCrCandidateGeneratorQuery( private def buildCrCandidateGeneratorQuery(
@ -338,7 +333,6 @@ class CrMixerThriftController @Inject() (
requestUUID: Long, requestUUID: Long,
userId: Long userId: Long
): Future[CrCandidateGeneratorQuery] = { ): Future[CrCandidateGeneratorQuery] = {
val product = thriftRequest.product val product = thriftRequest.product
val productContext = thriftRequest.productContext val productContext = thriftRequest.productContext
val scopedStats = statsReceiver val scopedStats = statsReceiver
@ -357,7 +351,7 @@ class CrMixerThriftController @Inject() (
userState userState
) )
// Specify product-specific behavior mapping here // Specify product-specific behavior mapping here.
val maxNumResults = (product, productContext) match { val maxNumResults = (product, productContext) match {
case (t.Product.Home, Some(t.ProductContext.HomeContext(homeContext))) => case (t.Product.Home, Some(t.ProductContext.HomeContext(homeContext))) =>
homeContext.maxResults.getOrElse(9999) homeContext.maxResults.getOrElse(9999)
@ -392,7 +386,6 @@ class CrMixerThriftController @Inject() (
thriftRequest: RelatedTweetRequest, thriftRequest: RelatedTweetRequest,
requestUUID: Long requestUUID: Long
): Future[RelatedTweetCandidateGeneratorQuery] = { ): Future[RelatedTweetCandidateGeneratorQuery] = {
val product = thriftRequest.product val product = thriftRequest.product
val scopedStats = statsReceiver val scopedStats = statsReceiver
.scope(product.toString).scope("RelatedTweetRequest") .scope(product.toString).scope("RelatedTweetRequest")
@ -409,8 +402,8 @@ class CrMixerThriftController @Inject() (
thriftRequest.product, thriftRequest.product,
userState) userState)
// Specify product-specific behavior mapping here // Specify product-specific behavior mapping here.
// Currently, Home takes 10, and RUX takes 100 // Currently, Home takes 10, and RUX takes 100.
val maxNumResults = params(RelatedTweetGlobalParams.MaxCandidatesPerRequestParam) val maxNumResults = params(RelatedTweetGlobalParams.MaxCandidatesPerRequestParam)
RelatedTweetCandidateGeneratorQuery( RelatedTweetCandidateGeneratorQuery(
@ -458,7 +451,6 @@ class CrMixerThriftController @Inject() (
thriftRequest: RelatedVideoTweetRequest, thriftRequest: RelatedVideoTweetRequest,
requestUUID: Long requestUUID: Long
): Future[RelatedVideoTweetCandidateGeneratorQuery] = { ): Future[RelatedVideoTweetCandidateGeneratorQuery] = {
val product = thriftRequest.product val product = thriftRequest.product
val scopedStats = statsReceiver val scopedStats = statsReceiver
.scope(product.toString).scope("RelatedVideoTweetRequest") .scope(product.toString).scope("RelatedVideoTweetRequest")
@ -487,14 +479,12 @@ class CrMixerThriftController @Inject() (
requestUUID = requestUUID requestUUID = requestUUID
) )
} }
} }
private def buildUtegTweetQuery( private def buildUtegTweetQuery(
thriftRequest: UtegTweetRequest, thriftRequest: UtegTweetRequest,
requestUUID: Long requestUUID: Long
): Future[UtegTweetCandidateGeneratorQuery] = { ): Future[UtegTweetCandidateGeneratorQuery] = {
val userId = thriftRequest.clientContext.userId.getOrElse( val userId = thriftRequest.clientContext.userId.getOrElse(
throw new IllegalArgumentException("userId must be present in the Thrift clientContext") throw new IllegalArgumentException("userId must be present in the Thrift clientContext")
) )
@ -536,7 +526,6 @@ class CrMixerThriftController @Inject() (
requestUUID = requestUUID requestUUID = requestUUID
) )
} }
} }
private def buildTopicTweetQuery( private def buildTopicTweetQuery(
@ -550,7 +539,7 @@ class CrMixerThriftController @Inject() (
val product = thriftRequest.product val product = thriftRequest.product
val productContext = thriftRequest.productContext val productContext = thriftRequest.productContext
// Specify product-specific behavior mapping here // Specify product-specific behavior mapping here.
val isVideoOnly = (product, productContext) match { val isVideoOnly = (product, productContext) match {
case (t.Product.ExploreTopics, Some(t.ProductContext.ExploreContext(context))) => case (t.Product.ExploreTopics, Some(t.ProductContext.ExploreContext(context))) =>
context.isVideoOnly context.isVideoOnly
@ -646,7 +635,6 @@ class CrMixerThriftController @Inject() (
private def buildThriftResponse( private def buildThriftResponse(
candidates: Seq[RankedCandidate] candidates: Seq[RankedCandidate]
): CrMixerTweetResponse = { ): CrMixerTweetResponse = {
val tweets = candidates.map { candidate => val tweets = candidates.map { candidate =>
TweetRecommendation( TweetRecommendation(
tweetId = candidate.tweetId, tweetId = candidate.tweetId,
@ -663,7 +651,7 @@ class CrMixerThriftController @Inject() (
private def scribeTweetScoreFunnelSeries( private def scribeTweetScoreFunnelSeries(
candidates: Seq[RankedCandidate] candidates: Seq[RankedCandidate]
): Seq[RankedCandidate] = { ): Seq[RankedCandidate] = {
// 202210210901 is a random number for code search of Lensview // 202210210901 is a random number for code search of Lensview.
tweetScoreFunnelSeries.startNewSpan( tweetScoreFunnelSeries.startNewSpan(
name = "GetTweetRecommendationsTopLevelTweetSimilarityEngineType", name = "GetTweetRecommendationsTopLevelTweetSimilarityEngineType",
codePtr = 202210210901L) { codePtr = 202210210901L) {
@ -734,7 +722,6 @@ class CrMixerThriftController @Inject() (
request: CrMixerTweetRequest, request: CrMixerTweetRequest,
response: Future[CrMixerTweetResponse] response: Future[CrMixerTweetResponse]
): Unit = { ): Unit = {
val userId = request.clientContext.userId.getOrElse( val userId = request.clientContext.userId.getOrElse(
throw new IllegalArgumentException( throw new IllegalArgumentException(
"userId must be present in getTweetRecommendations() Thrift clientContext")) "userId must be present in getTweetRecommendations() Thrift clientContext"))