[docx] split commit for file 800

Signed-off-by: Ari Archer <ari.web.xyz@gmail.com>
This commit is contained in:
Ari Archer 2024-01-23 19:05:33 +02:00
parent 78b3118da4
commit eb3e2f9bcf
No known key found for this signature in database
GPG Key ID: A50D5B4B599AF8A2
397 changed files with 0 additions and 10228 deletions

View File

@ -1,65 +0,0 @@
package com.twitter.cr_mixer.source_signal
import com.twitter.cr_mixer.config.TimeoutConfig
import com.twitter.cr_mixer.model.ModuleNames
import com.twitter.cr_mixer.model.SourceInfo
import com.twitter.cr_mixer.param.FrsParams
import com.twitter.cr_mixer.param.GlobalParams
import com.twitter.cr_mixer.source_signal.FrsStore.FrsQueryResult
import com.twitter.cr_mixer.source_signal.SourceFetcher.FetcherQuery
import com.twitter.cr_mixer.thriftscala.SourceType
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.simclusters_v2.common.UserId
import com.twitter.simclusters_v2.thriftscala.InternalId
import com.twitter.storehaus.ReadableStore
import com.twitter.util.Future
import javax.inject.Singleton
import javax.inject.Inject
import javax.inject.Named
@Singleton
case class FrsSourceSignalFetcher @Inject() (
@Named(ModuleNames.FrsStore) frsStore: ReadableStore[FrsStore.Query, Seq[FrsQueryResult]],
override val timeoutConfig: TimeoutConfig,
globalStats: StatsReceiver)
extends SourceSignalFetcher {
override protected val stats: StatsReceiver = globalStats.scope(identifier)
override type SignalConvertType = UserId
override def isEnabled(query: FetcherQuery): Boolean = {
query.params(FrsParams.EnableSourceParam)
}
override def fetchAndProcess(query: FetcherQuery): Future[Option[Seq[SourceInfo]]] = {
// Fetch raw signals
val rawSignals = frsStore
.get(FrsStore.Query(query.userId, query.params(GlobalParams.UnifiedMaxSourceKeyNum)))
.map {
_.map {
_.map {
_.userId
}
}
}
// Process signals
rawSignals.map {
_.map { frsUsers =>
convertSourceInfo(SourceType.FollowRecommendation, frsUsers)
}
}
}
override def convertSourceInfo(
sourceType: SourceType,
signals: Seq[SignalConvertType]
): Seq[SourceInfo] = {
signals.map { signal =>
SourceInfo(
sourceType = sourceType,
internalId = InternalId.UserId(signal),
sourceEventTime = None
)
}
}
}

View File

@ -1,81 +0,0 @@
package com.twitter.cr_mixer.source_signal
import com.twitter.cr_mixer.param.decider.CrMixerDecider
import com.twitter.cr_mixer.param.decider.DeciderConstants
import com.twitter.cr_mixer.source_signal.FrsStore.Query
import com.twitter.cr_mixer.source_signal.FrsStore.FrsQueryResult
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.follow_recommendations.thriftscala.ClientContext
import com.twitter.follow_recommendations.thriftscala.DisplayLocation
import com.twitter.follow_recommendations.thriftscala.FollowRecommendationsThriftService
import com.twitter.follow_recommendations.thriftscala.Recommendation
import com.twitter.follow_recommendations.thriftscala.RecommendationRequest
import com.twitter.storehaus.ReadableStore
import javax.inject.Singleton
import com.twitter.simclusters_v2.common.UserId
import com.twitter.util.Future
@Singleton
case class FrsStore(
frsClient: FollowRecommendationsThriftService.MethodPerEndpoint,
statsReceiver: StatsReceiver,
decider: CrMixerDecider)
extends ReadableStore[Query, Seq[FrsQueryResult]] {
override def get(
query: Query
): Future[Option[Seq[FrsQueryResult]]] = {
if (decider.isAvailable(DeciderConstants.enableFRSTrafficDeciderKey)) {
val recommendationRequest =
buildFollowRecommendationRequest(query)
frsClient
.getRecommendations(recommendationRequest).map { recommendationResponse =>
Some(recommendationResponse.recommendations.collect {
case recommendation: Recommendation.User =>
FrsQueryResult(
recommendation.user.userId,
recommendation.user.scoringDetails
.flatMap(_.score).getOrElse(0.0),
recommendation.user.scoringDetails
.flatMap(_.candidateSourceDetails.flatMap(_.primarySource)),
recommendation.user.scoringDetails
.flatMap(_.candidateSourceDetails.flatMap(_.candidateSourceScores)).map(_.toMap)
)
})
}
} else {
Future.None
}
}
private def buildFollowRecommendationRequest(
query: Query
): RecommendationRequest = {
RecommendationRequest(
clientContext = ClientContext(
userId = Some(query.userId),
countryCode = query.countryCodeOpt,
languageCode = query.languageCodeOpt),
displayLocation = query.displayLocation,
maxResults = Some(query.maxConsumerSeedsNum),
excludedIds = Some(query.excludedUserIds)
)
}
}
object FrsStore {
case class Query(
userId: UserId,
maxConsumerSeedsNum: Int,
displayLocation: DisplayLocation = DisplayLocation.ContentRecommender,
excludedUserIds: Seq[UserId] = Seq.empty,
languageCodeOpt: Option[String] = None,
countryCodeOpt: Option[String] = None)
case class FrsQueryResult(
userId: UserId,
score: Double,
primarySource: Option[Int],
sourceWithScores: Option[Map[String, Double]])
}

View File

@ -1,55 +0,0 @@
package com.twitter.cr_mixer.source_signal
import com.twitter.cr_mixer.config.TimeoutConfig
import com.twitter.cr_mixer.model.GraphSourceInfo
import com.twitter.cr_mixer.model.ModuleNames
import com.twitter.cr_mixer.param.RealGraphInParams
import com.twitter.cr_mixer.source_signal.SourceFetcher.FetcherQuery
import com.twitter.cr_mixer.thriftscala.SourceType
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.simclusters_v2.common.UserId
import com.twitter.storehaus.ReadableStore
import com.twitter.util.Future
import com.twitter.wtf.candidate.thriftscala.CandidateSeq
import javax.inject.Inject
import javax.inject.Named
import javax.inject.Singleton
/**
* This store fetch user recommendations from In-Network RealGraph (go/realgraph) for a given userId
*/
@Singleton
case class RealGraphInSourceGraphFetcher @Inject() (
@Named(ModuleNames.RealGraphInStore) realGraphStoreMh: ReadableStore[UserId, CandidateSeq],
override val timeoutConfig: TimeoutConfig,
globalStats: StatsReceiver)
extends SourceGraphFetcher {
override protected val stats: StatsReceiver = globalStats.scope(identifier)
override protected val graphSourceType: SourceType = SourceType.RealGraphIn
override def isEnabled(query: FetcherQuery): Boolean = {
query.params(RealGraphInParams.EnableSourceGraphParam)
}
override def fetchAndProcess(
query: FetcherQuery,
): Future[Option[GraphSourceInfo]] = {
val rawSignals = trackPerItemStats(query)(
realGraphStoreMh.get(query.userId).map {
_.map { candidateSeq =>
candidateSeq.candidates
.map { candidate =>
// Bundle the userId with its score
(candidate.userId, candidate.score)
}
}
}
)
rawSignals.map {
_.map { userWithScores =>
convertGraphSourceInfo(userWithScores)
}
}
}
}

View File

@ -1,55 +0,0 @@
package com.twitter.cr_mixer.source_signal
import com.twitter.cr_mixer.config.TimeoutConfig
import com.twitter.cr_mixer.model.GraphSourceInfo
import com.twitter.cr_mixer.model.ModuleNames
import com.twitter.cr_mixer.param.RealGraphOonParams
import com.twitter.cr_mixer.source_signal.SourceFetcher.FetcherQuery
import com.twitter.cr_mixer.thriftscala.SourceType
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.simclusters_v2.common.UserId
import com.twitter.storehaus.ReadableStore
import com.twitter.util.Future
import com.twitter.wtf.candidate.thriftscala.CandidateSeq
import javax.inject.Inject
import javax.inject.Named
import javax.inject.Singleton
/**
* This store fetch user recommendations from RealGraphOON (go/realgraph) for a given userId
*/
@Singleton
case class RealGraphOonSourceGraphFetcher @Inject() (
@Named(ModuleNames.RealGraphOonStore) realGraphOonStore: ReadableStore[UserId, CandidateSeq],
override val timeoutConfig: TimeoutConfig,
globalStats: StatsReceiver)
extends SourceGraphFetcher {
override protected val stats: StatsReceiver = globalStats.scope(identifier)
override protected val graphSourceType: SourceType = SourceType.RealGraphOon
override def isEnabled(query: FetcherQuery): Boolean = {
query.params(RealGraphOonParams.EnableSourceGraphParam)
}
override def fetchAndProcess(
query: FetcherQuery,
): Future[Option[GraphSourceInfo]] = {
val rawSignals = trackPerItemStats(query)(
realGraphOonStore.get(query.userId).map {
_.map { candidateSeq =>
candidateSeq.candidates
.map { candidate =>
// Bundle the userId with its score
(candidate.userId, candidate.score)
}.take(query.params(RealGraphOonParams.MaxConsumerSeedsNumParam))
}
}
)
rawSignals.map {
_.map { userWithScores =>
convertGraphSourceInfo(userWithScores)
}
}
}
}

View File

@ -1,101 +0,0 @@
package com.twitter.cr_mixer.source_signal
import com.twitter.core_workflows.user_model.thriftscala.UserState
import com.twitter.cr_mixer.config.TimeoutConfig
import com.twitter.cr_mixer.source_signal.SourceFetcher.FetcherQuery
import com.twitter.simclusters_v2.common.UserId
import com.twitter.timelines.configapi.Params
import com.twitter.cr_mixer.thriftscala.{Product => TProduct}
import com.twitter.finagle.GlobalRequestTimeoutException
import com.twitter.finagle.mux.ClientDiscardedRequestException
import com.twitter.finagle.mux.ServerApplicationError
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.storehaus.ReadableStore
import com.twitter.util.Future
import com.twitter.util.TimeoutException
import org.apache.thrift.TApplicationException
import com.twitter.util.logging.Logging
/**
* A SourceFetcher is a trait which, given a [[FetcherQuery]], returns [[ResultType]]
* The main purposes of a SourceFetcher is to provide a consistent interface for source fetch
* logic, and provides default functions, including:
* - Identification
* - Observability
* - Timeout settings
* - Exception Handling
*/
trait SourceFetcher[ResultType] extends ReadableStore[FetcherQuery, ResultType] with Logging {
protected final val timer = com.twitter.finagle.util.DefaultTimer
protected final def identifier: String = this.getClass.getSimpleName
protected def stats: StatsReceiver
protected def timeoutConfig: TimeoutConfig
/***
* Use FeatureSwitch to decide if a specific source is enabled.
*/
def isEnabled(query: FetcherQuery): Boolean
/***
* This function fetches the raw sources and process them.
* Custom stats tracking can be added depending on the type of ResultType
*/
def fetchAndProcess(
query: FetcherQuery,
): Future[Option[ResultType]]
/***
* Side-effect function to track stats for signal fetching and processing.
*/
def trackStats(
query: FetcherQuery
)(
func: => Future[Option[ResultType]]
): Future[Option[ResultType]]
/***
* This function is called by the top level class to fetch sources. It executes the pipeline to
* fetch raw data, process and transform the sources. Exceptions, Stats, and timeout control are
* handled here.
*/
override def get(
query: FetcherQuery
): Future[Option[ResultType]] = {
val scopedStats = stats.scope(query.product.originalName)
if (isEnabled(query)) {
scopedStats.counter("gate_enabled").incr()
trackStats(query)(fetchAndProcess(query))
.raiseWithin(timeoutConfig.signalFetchTimeout)(timer)
.onFailure { e =>
scopedStats.scope("exceptions").counter(e.getClass.getSimpleName).incr()
}
.rescue {
case _: TimeoutException | _: GlobalRequestTimeoutException | _: TApplicationException |
_: ClientDiscardedRequestException |
_: ServerApplicationError // TApplicationException inside
=>
Future.None
case e =>
logger.info(e)
Future.None
}
} else {
scopedStats.counter("gate_disabled").incr()
Future.None
}
}
}
object SourceFetcher {
/***
* Every SourceFetcher all share the same input: FetcherQuery
*/
case class FetcherQuery(
userId: UserId,
product: TProduct,
userState: UserState,
params: Params)
}

View File

@ -1,70 +0,0 @@
package com.twitter.cr_mixer.source_signal
import com.twitter.cr_mixer.model.GraphSourceInfo
import com.twitter.cr_mixer.source_signal.SourceFetcher.FetcherQuery
import com.twitter.cr_mixer.thriftscala.SourceType
import com.twitter.frigate.common.util.StatsUtil
import com.twitter.simclusters_v2.common.UserId
import com.twitter.util.Future
/***
* A SourceGraphFetcher is a trait that extends from `SourceFetcher`
* and is specialized in tackling User Graph (eg., RealGraphOon, FRS) fetch.
*
* The [[ResultType]] of a SourceGraphFetcher is a `GraphSourceInfo` which contains a userSeedSet.
* When we pass in userId, the underlying store returns one GraphSourceInfo.
*/
trait SourceGraphFetcher extends SourceFetcher[GraphSourceInfo] {
protected final val DefaultSeedScore = 1.0
protected def graphSourceType: SourceType
/***
* RawDataType contains a consumers seed UserId and a score (weight)
*/
protected type RawDataType = (UserId, Double)
def trackStats(
query: FetcherQuery
)(
func: => Future[Option[GraphSourceInfo]]
): Future[Option[GraphSourceInfo]] = {
val productScopedStats = stats.scope(query.product.originalName)
val productUserStateScopedStats = productScopedStats.scope(query.userState.toString)
StatsUtil
.trackOptionStats(productScopedStats) {
StatsUtil
.trackOptionStats(productUserStateScopedStats) {
func
}
}
}
// Track per item stats on the fetched graph results
def trackPerItemStats(
query: FetcherQuery
)(
func: => Future[Option[Seq[RawDataType]]]
): Future[Option[Seq[RawDataType]]] = {
val productScopedStats = stats.scope(query.product.originalName)
val productUserStateScopedStats = productScopedStats.scope(query.userState.toString)
StatsUtil.trackOptionItemsStats(productScopedStats) {
StatsUtil.trackOptionItemsStats(productUserStateScopedStats) {
func
}
}
}
/***
* Convert Seq[RawDataType] into GraphSourceInfo
*/
protected final def convertGraphSourceInfo(
userWithScores: Seq[RawDataType]
): GraphSourceInfo = {
GraphSourceInfo(
sourceType = graphSourceType,
seedWithScores = userWithScores.map { userWithScore =>
userWithScore._1 -> userWithScore._2
}.toMap
)
}
}

View File

@ -1,68 +0,0 @@
package com.twitter.cr_mixer.source_signal
import com.twitter.core_workflows.user_model.thriftscala.UserState
import com.twitter.cr_mixer.model.GraphSourceInfo
import com.twitter.cr_mixer.model.SourceInfo
import com.twitter.cr_mixer.source_signal.SourceFetcher.FetcherQuery
import com.twitter.cr_mixer.thriftscala.SourceType
import com.twitter.cr_mixer.thriftscala.{Product => TProduct}
import com.twitter.simclusters_v2.common.UserId
import com.twitter.timelines.configapi
import com.twitter.util.Future
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
case class SourceInfoRouter @Inject() (
ussSourceSignalFetcher: UssSourceSignalFetcher,
frsSourceSignalFetcher: FrsSourceSignalFetcher,
frsSourceGraphFetcher: FrsSourceGraphFetcher,
realGraphOonSourceGraphFetcher: RealGraphOonSourceGraphFetcher,
realGraphInSourceGraphFetcher: RealGraphInSourceGraphFetcher,
) {
def get(
userId: UserId,
product: TProduct,
userState: UserState,
params: configapi.Params
): Future[(Set[SourceInfo], Map[String, Option[GraphSourceInfo]])] = {
val fetcherQuery = FetcherQuery(userId, product, userState, params)
Future.join(
getSourceSignals(fetcherQuery),
getSourceGraphs(fetcherQuery)
)
}
private def getSourceSignals(
fetcherQuery: FetcherQuery
): Future[Set[SourceInfo]] = {
Future
.join(
ussSourceSignalFetcher.get(fetcherQuery),
frsSourceSignalFetcher.get(fetcherQuery)).map {
case (ussSignalsOpt, frsSignalsOpt) =>
(ussSignalsOpt.getOrElse(Seq.empty) ++ frsSignalsOpt.getOrElse(Seq.empty)).toSet
}
}
private def getSourceGraphs(
fetcherQuery: FetcherQuery
): Future[Map[String, Option[GraphSourceInfo]]] = {
Future
.join(
frsSourceGraphFetcher.get(fetcherQuery),
realGraphOonSourceGraphFetcher.get(fetcherQuery),
realGraphInSourceGraphFetcher.get(fetcherQuery)
).map {
case (frsGraphOpt, realGraphOonGraphOpt, realGraphInGraphOpt) =>
Map(
SourceType.FollowRecommendation.name -> frsGraphOpt,
SourceType.RealGraphOon.name -> realGraphOonGraphOpt,
SourceType.RealGraphIn.name -> realGraphInGraphOpt,
)
}
}
}

View File

@ -1,45 +0,0 @@
package com.twitter.cr_mixer.source_signal
import com.twitter.cr_mixer.model.SourceInfo
import com.twitter.cr_mixer.source_signal.SourceFetcher.FetcherQuery
import com.twitter.cr_mixer.thriftscala.SourceType
import com.twitter.frigate.common.util.StatsUtil
import com.twitter.util.Future
/***
* A SourceSignalFetcher is a trait that extends from `SourceFetcher`
* and is specialized in tackling Signals (eg., USS, FRS) fetch.
* Currently, we define Signals as (but not limited to) a set of past engagements that
* the user makes, such as RecentFav, RecentFollow, etc.
*
* The [[ResultType]] of a SourceSignalFetcher is `Seq[SourceInfo]`. When we pass in userId,
* the underlying store returns a list of signals.
*/
trait SourceSignalFetcher extends SourceFetcher[Seq[SourceInfo]] {
protected type SignalConvertType
def trackStats(
query: FetcherQuery
)(
func: => Future[Option[Seq[SourceInfo]]]
): Future[Option[Seq[SourceInfo]]] = {
val productScopedStats = stats.scope(query.product.originalName)
val productUserStateScopedStats = productScopedStats.scope(query.userState.toString)
StatsUtil
.trackOptionItemsStats(productScopedStats) {
StatsUtil
.trackOptionItemsStats(productUserStateScopedStats) {
func
}
}
}
/***
* Convert a list of Signals of type [[SignalConvertType]] into SourceInfo
*/
def convertSourceInfo(
sourceType: SourceType,
signals: Seq[SignalConvertType]
): Seq[SourceInfo]
}

View File

@ -1,160 +0,0 @@
package com.twitter.cr_mixer.source_signal
import com.twitter.cr_mixer.config.TimeoutConfig
import com.twitter.cr_mixer.model.ModuleNames
import com.twitter.cr_mixer.model.SourceInfo
import com.twitter.cr_mixer.thriftscala.SourceType
import com.twitter.cr_mixer.source_signal.SourceFetcher.FetcherQuery
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.storehaus.ReadableStore
import com.twitter.usersignalservice.thriftscala.{Signal => UssSignal}
import com.twitter.usersignalservice.thriftscala.SignalType
import com.twitter.frigate.common.util.StatsUtil.Size
import com.twitter.frigate.common.util.StatsUtil.Success
import com.twitter.frigate.common.util.StatsUtil.Empty
import com.twitter.util.Future
import com.twitter.util.Time
import javax.inject.Singleton
import javax.inject.Inject
import javax.inject.Named
@Singleton
case class UssSourceSignalFetcher @Inject() (
@Named(ModuleNames.UssStore) ussStore: ReadableStore[UssStore.Query, Seq[
(SignalType, Seq[UssSignal])
]],
override val timeoutConfig: TimeoutConfig,
globalStats: StatsReceiver)
extends SourceSignalFetcher {
override protected val stats: StatsReceiver = globalStats.scope(identifier)
override type SignalConvertType = UssSignal
// always enable USS call. We have fine-grained FS to decider which signal to fetch
override def isEnabled(query: FetcherQuery): Boolean = true
override def fetchAndProcess(
query: FetcherQuery,
): Future[Option[Seq[SourceInfo]]] = {
// Fetch raw signals
val rawSignals = ussStore.get(UssStore.Query(query.userId, query.params, query.product)).map {
_.map {
_.map {
case (signalType, signals) =>
trackUssSignalStatsPerSignalType(query, signalType, signals)
(signalType, signals)
}
}
}
/**
* Process signals:
* Transform a Seq of USS Signals with signalType specified to a Seq of SourceInfo
* We do case match to make sure the SignalType can correctly map to a SourceType defined in CrMixer
* and it should be simplified.
*/
rawSignals.map {
_.map { nestedSignal =>
val sourceInfoList = nestedSignal.flatMap {
case (signalType, ussSignals) =>
signalType match {
case SignalType.TweetFavorite =>
convertSourceInfo(sourceType = SourceType.TweetFavorite, signals = ussSignals)
case SignalType.Retweet =>
convertSourceInfo(sourceType = SourceType.Retweet, signals = ussSignals)
case SignalType.Reply =>
convertSourceInfo(sourceType = SourceType.Reply, signals = ussSignals)
case SignalType.OriginalTweet =>
convertSourceInfo(sourceType = SourceType.OriginalTweet, signals = ussSignals)
case SignalType.AccountFollow =>
convertSourceInfo(sourceType = SourceType.UserFollow, signals = ussSignals)
case SignalType.RepeatedProfileVisit180dMinVisit6V1 |
SignalType.RepeatedProfileVisit90dMinVisit6V1 |
SignalType.RepeatedProfileVisit14dMinVisit2V1 =>
convertSourceInfo(
sourceType = SourceType.UserRepeatedProfileVisit,
signals = ussSignals)
case SignalType.NotificationOpenAndClickV1 =>
convertSourceInfo(sourceType = SourceType.NotificationClick, signals = ussSignals)
case SignalType.TweetShareV1 =>
convertSourceInfo(sourceType = SourceType.TweetShare, signals = ussSignals)
case SignalType.RealGraphOon =>
convertSourceInfo(sourceType = SourceType.RealGraphOon, signals = ussSignals)
case SignalType.GoodTweetClick | SignalType.GoodTweetClick5s |
SignalType.GoodTweetClick10s | SignalType.GoodTweetClick30s =>
convertSourceInfo(sourceType = SourceType.GoodTweetClick, signals = ussSignals)
case SignalType.VideoView90dPlayback50V1 =>
convertSourceInfo(
sourceType = SourceType.VideoTweetPlayback50,
signals = ussSignals)
case SignalType.VideoView90dQualityV1 =>
convertSourceInfo(
sourceType = SourceType.VideoTweetQualityView,
signals = ussSignals)
case SignalType.GoodProfileClick | SignalType.GoodProfileClick20s |
SignalType.GoodProfileClick30s =>
convertSourceInfo(sourceType = SourceType.GoodProfileClick, signals = ussSignals)
// negative signals
case SignalType.AccountBlock =>
convertSourceInfo(sourceType = SourceType.AccountBlock, signals = ussSignals)
case SignalType.AccountMute =>
convertSourceInfo(sourceType = SourceType.AccountMute, signals = ussSignals)
case SignalType.TweetReport =>
convertSourceInfo(sourceType = SourceType.TweetReport, signals = ussSignals)
case SignalType.TweetDontLike =>
convertSourceInfo(sourceType = SourceType.TweetDontLike, signals = ussSignals)
// Aggregated Signals
case SignalType.TweetBasedUnifiedEngagementWeightedSignal |
SignalType.TweetBasedUnifiedUniformSignal =>
convertSourceInfo(sourceType = SourceType.TweetAggregation, signals = ussSignals)
case SignalType.ProducerBasedUnifiedEngagementWeightedSignal |
SignalType.ProducerBasedUnifiedUniformSignal =>
convertSourceInfo(sourceType = SourceType.ProducerAggregation, signals = ussSignals)
// Default
case _ =>
Seq.empty[SourceInfo]
}
}
sourceInfoList
}
}
}
override def convertSourceInfo(
sourceType: SourceType,
signals: Seq[SignalConvertType]
): Seq[SourceInfo] = {
signals.map { signal =>
SourceInfo(
sourceType = sourceType,
internalId = signal.targetInternalId.getOrElse(
throw new IllegalArgumentException(
s"${sourceType.toString} Signal does not have internalId")),
sourceEventTime =
if (signal.timestamp == 0L) None else Some(Time.fromMilliseconds(signal.timestamp))
)
}
}
private def trackUssSignalStatsPerSignalType(
query: FetcherQuery,
signalType: SignalType,
ussSignals: Seq[UssSignal]
): Unit = {
val productScopedStats = stats.scope(query.product.originalName)
val productUserStateScopedStats = productScopedStats.scope(query.userState.toString)
val productStats = productScopedStats.scope(signalType.toString)
val productUserStateStats = productUserStateScopedStats.scope(signalType.toString)
productStats.counter(Success).incr()
productUserStateStats.counter(Success).incr()
val size = ussSignals.size
productStats.stat(Size).add(size)
productUserStateStats.stat(Size).add(size)
if (size == 0) {
productStats.counter(Empty).incr()
productUserStateStats.counter(Empty).incr()
}
}
}

View File

@ -1,209 +0,0 @@
package com.twitter.cr_mixer.source_signal
import com.twitter.cr_mixer.param.GlobalParams
import com.twitter.cr_mixer.param.GoodProfileClickParams
import com.twitter.cr_mixer.param.GoodTweetClickParams
import com.twitter.cr_mixer.param.RealGraphOonParams
import com.twitter.cr_mixer.param.RecentFollowsParams
import com.twitter.cr_mixer.param.RecentNegativeSignalParams
import com.twitter.cr_mixer.param.RecentNotificationsParams
import com.twitter.cr_mixer.param.RecentOriginalTweetsParams
import com.twitter.cr_mixer.param.RecentReplyTweetsParams
import com.twitter.cr_mixer.param.RecentRetweetsParams
import com.twitter.cr_mixer.param.RecentTweetFavoritesParams
import com.twitter.cr_mixer.param.RepeatedProfileVisitsParams
import com.twitter.cr_mixer.param.TweetSharesParams
import com.twitter.cr_mixer.param.UnifiedUSSSignalParams
import com.twitter.cr_mixer.param.VideoViewTweetsParams
import com.twitter.cr_mixer.source_signal.UssStore.Query
import com.twitter.cr_mixer.thriftscala.SourceType
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.simclusters_v2.common.UserId
import com.twitter.storehaus.ReadableStore
import com.twitter.usersignalservice.thriftscala.{Signal => UssSignal}
import com.twitter.usersignalservice.thriftscala.SignalType
import javax.inject.Singleton
import com.twitter.timelines.configapi
import com.twitter.timelines.configapi.Params
import com.twitter.usersignalservice.thriftscala.BatchSignalRequest
import com.twitter.usersignalservice.thriftscala.BatchSignalResponse
import com.twitter.usersignalservice.thriftscala.SignalRequest
import com.twitter.util.Future
import com.twitter.cr_mixer.thriftscala.Product
import com.twitter.usersignalservice.thriftscala.ClientIdentifier
@Singleton
case class UssStore(
stratoStore: ReadableStore[BatchSignalRequest, BatchSignalResponse],
statsReceiver: StatsReceiver)
extends ReadableStore[Query, Seq[(SignalType, Seq[UssSignal])]] {
import com.twitter.cr_mixer.source_signal.UssStore._
override def get(query: Query): Future[Option[Seq[(SignalType, Seq[UssSignal])]]] = {
val ussClientIdentifier = query.product match {
case Product.Home =>
ClientIdentifier.CrMixerHome
case Product.Notifications =>
ClientIdentifier.CrMixerNotifications
case Product.Email =>
ClientIdentifier.CrMixerEmail
case _ =>
ClientIdentifier.Unknown
}
val batchSignalRequest =
BatchSignalRequest(
query.userId,
buildUserSignalServiceRequests(query.params),
Some(ussClientIdentifier))
stratoStore
.get(batchSignalRequest)
.map {
_.map { batchSignalResponse =>
batchSignalResponse.signalResponse.toSeq.map {
case (signalType, ussSignals) =>
(signalType, ussSignals)
}
}
}
}
private def buildUserSignalServiceRequests(
param: Params,
): Seq[SignalRequest] = {
val unifiedMaxSourceKeyNum = param(GlobalParams.UnifiedMaxSourceKeyNum)
val goodTweetClickMaxSignalNum = param(GoodTweetClickParams.MaxSignalNumParam)
val aggrTweetMaxSourceKeyNum = param(UnifiedUSSSignalParams.UnifiedTweetSourceNumberParam)
val aggrProducerMaxSourceKeyNum = param(UnifiedUSSSignalParams.UnifiedProducerSourceNumberParam)
val maybeRecentTweetFavorite =
if (param(RecentTweetFavoritesParams.EnableSourceParam))
Some(SignalRequest(Some(unifiedMaxSourceKeyNum), SignalType.TweetFavorite))
else None
val maybeRecentRetweet =
if (param(RecentRetweetsParams.EnableSourceParam))
Some(SignalRequest(Some(unifiedMaxSourceKeyNum), SignalType.Retweet))
else None
val maybeRecentReply =
if (param(RecentReplyTweetsParams.EnableSourceParam))
Some(SignalRequest(Some(unifiedMaxSourceKeyNum), SignalType.Reply))
else None
val maybeRecentOriginalTweet =
if (param(RecentOriginalTweetsParams.EnableSourceParam))
Some(SignalRequest(Some(unifiedMaxSourceKeyNum), SignalType.OriginalTweet))
else None
val maybeRecentFollow =
if (param(RecentFollowsParams.EnableSourceParam))
Some(SignalRequest(Some(unifiedMaxSourceKeyNum), SignalType.AccountFollow))
else None
val maybeRepeatedProfileVisits =
if (param(RepeatedProfileVisitsParams.EnableSourceParam))
Some(
SignalRequest(
Some(unifiedMaxSourceKeyNum),
param(RepeatedProfileVisitsParams.ProfileMinVisitType).signalType))
else None
val maybeRecentNotifications =
if (param(RecentNotificationsParams.EnableSourceParam))
Some(SignalRequest(Some(unifiedMaxSourceKeyNum), SignalType.NotificationOpenAndClickV1))
else None
val maybeTweetShares =
if (param(TweetSharesParams.EnableSourceParam)) {
Some(SignalRequest(Some(unifiedMaxSourceKeyNum), SignalType.TweetShareV1))
} else None
val maybeRealGraphOon =
if (param(RealGraphOonParams.EnableSourceParam)) {
Some(SignalRequest(Some(unifiedMaxSourceKeyNum), SignalType.RealGraphOon))
} else None
val maybeGoodTweetClick =
if (param(GoodTweetClickParams.EnableSourceParam))
Some(
SignalRequest(
Some(goodTweetClickMaxSignalNum),
param(GoodTweetClickParams.ClickMinDwellTimeType).signalType))
else None
val maybeVideoViewTweets =
if (param(VideoViewTweetsParams.EnableSourceParam)) {
Some(
SignalRequest(
Some(unifiedMaxSourceKeyNum),
param(VideoViewTweetsParams.VideoViewTweetTypeParam).signalType))
} else None
val maybeGoodProfileClick =
if (param(GoodProfileClickParams.EnableSourceParam))
Some(
SignalRequest(
Some(unifiedMaxSourceKeyNum),
param(GoodProfileClickParams.ClickMinDwellTimeType).signalType))
else None
val maybeAggTweetSignal =
if (param(UnifiedUSSSignalParams.EnableTweetAggSourceParam))
Some(
SignalRequest(
Some(aggrTweetMaxSourceKeyNum),
param(UnifiedUSSSignalParams.TweetAggTypeParam).signalType
)
)
else None
val maybeAggProducerSignal =
if (param(UnifiedUSSSignalParams.EnableProducerAggSourceParam))
Some(
SignalRequest(
Some(aggrProducerMaxSourceKeyNum),
param(UnifiedUSSSignalParams.ProducerAggTypeParam).signalType
)
)
else None
// negative signals
val maybeNegativeSignals = if (param(RecentNegativeSignalParams.EnableSourceParam)) {
EnabledNegativeSignalTypes
.map(negativeSignal => SignalRequest(Some(unifiedMaxSourceKeyNum), negativeSignal)).toSeq
} else Seq.empty
val allPositiveSignals =
if (param(UnifiedUSSSignalParams.ReplaceIndividualUSSSourcesParam))
Seq(
maybeRecentOriginalTweet,
maybeRecentNotifications,
maybeRealGraphOon,
maybeGoodTweetClick,
maybeGoodProfileClick,
maybeAggProducerSignal,
maybeAggTweetSignal,
)
else
Seq(
maybeRecentTweetFavorite,
maybeRecentRetweet,
maybeRecentReply,
maybeRecentOriginalTweet,
maybeRecentFollow,
maybeRepeatedProfileVisits,
maybeRecentNotifications,
maybeTweetShares,
maybeRealGraphOon,
maybeGoodTweetClick,
maybeVideoViewTweets,
maybeGoodProfileClick,
maybeAggProducerSignal,
maybeAggTweetSignal,
)
allPositiveSignals.flatten ++ maybeNegativeSignals
}
}
object UssStore {
case class Query(
userId: UserId,
params: configapi.Params,
product: Product)
val EnabledNegativeSourceTypes: Set[SourceType] =
Set(SourceType.AccountBlock, SourceType.AccountMute)
private val EnabledNegativeSignalTypes: Set[SignalType] =
Set(SignalType.AccountBlock, SignalType.AccountMute)
}

View File

@ -1,29 +0,0 @@
scala_library(
sources = ["*.scala"],
compiler_option_sets = ["fatal_warnings"],
strict_deps = True,
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/com/twitter/storehaus:core",
"3rdparty/jvm/org/lz4:lz4-java",
"3rdparty/src/jvm/com/twitter/storehaus:core",
"configapi/configapi-core",
"content-recommender/thrift/src/main/thrift:thrift-scala",
"cr-mixer/server/src/main/scala/com/twitter/cr_mixer/config",
"cr-mixer/server/src/main/scala/com/twitter/cr_mixer/model",
"cr-mixer/server/src/main/scala/com/twitter/cr_mixer/param",
"cr-mixer/thrift/src/main/thrift:thrift-scala",
"finatra/inject/inject-core/src/main/scala",
"frigate/frigate-common/src/main/scala/com/twitter/frigate/common/util:stats_util",
"relevance-platform/src/main/scala/com/twitter/relevance_platform/common/stats",
"src/java/com/twitter/search/common/schema/base",
"src/java/com/twitter/search/common/schema/earlybird",
"src/java/com/twitter/search/queryparser/query:core-query-nodes",
"src/java/com/twitter/search/queryparser/query/search:search-query-nodes",
"src/scala/com/twitter/simclusters_v2/common",
"src/thrift/com/twitter/core_workflows/user_model:user_model-scala",
"src/thrift/com/twitter/search:earlybird-scala",
"src/thrift/com/twitter/search/common:ranking-scala",
"src/thrift/com/twitter/simclusters_v2:simclusters_v2-thrift-scala",
],
)

View File

@ -1,39 +0,0 @@
package com.twitter.cr_mixer.util
import com.twitter.cr_mixer.model.CandidateGenerationInfo
import com.twitter.cr_mixer.model.SourceInfo
import com.twitter.cr_mixer.thriftscala.CandidateGenerationKey
import com.twitter.cr_mixer.thriftscala.SimilarityEngine
import com.twitter.cr_mixer.thriftscala.SourceType
import com.twitter.simclusters_v2.common.UserId
import com.twitter.simclusters_v2.thriftscala.InternalId
import com.twitter.util.Time
object CandidateGenerationKeyUtil {
private val PlaceholderUserId = 0L // this default value will not be used
private val DefaultSourceInfo: SourceInfo = SourceInfo(
sourceType = SourceType.RequestUserId,
sourceEventTime = None,
internalId = InternalId.UserId(PlaceholderUserId)
)
def toThrift(
candidateGenerationInfo: CandidateGenerationInfo,
requestUserId: UserId
): CandidateGenerationKey = {
CandidateGenerationKey(
sourceType = candidateGenerationInfo.sourceInfoOpt.getOrElse(DefaultSourceInfo).sourceType,
sourceEventTime = candidateGenerationInfo.sourceInfoOpt
.getOrElse(DefaultSourceInfo).sourceEventTime.getOrElse(Time.fromMilliseconds(0L)).inMillis,
id = candidateGenerationInfo.sourceInfoOpt
.map(_.internalId).getOrElse(InternalId.UserId(requestUserId)),
modelId = candidateGenerationInfo.similarityEngineInfo.modelId.getOrElse(""),
similarityEngineType =
Some(candidateGenerationInfo.similarityEngineInfo.similarityEngineType),
contributingSimilarityEngine =
Some(candidateGenerationInfo.contributingSimilarityEngines.map(se =>
SimilarityEngine(se.similarityEngineType, se.modelId, se.score)))
)
}
}

View File

@ -1,180 +0,0 @@
package com.twitter.cr_mixer.util
import com.twitter.cr_mixer.model.Candidate
import com.twitter.cr_mixer.model.InitialCandidate
import com.twitter.cr_mixer.model.RankedCandidate
import com.twitter.cr_mixer.model.SourceInfo
import com.twitter.cr_mixer.param.BlenderParams.BlendGroupingMethodEnum
import com.twitter.cr_mixer.thriftscala.SimilarityEngineType
import com.twitter.simclusters_v2.thriftscala.InternalId
object CountWeightedInterleaveUtil {
/**
* Grouping key for interleaving candidates
*
* @param sourceInfoOpt optional SourceInfo, containing the source information
* @param similarityEngineTypeOpt optional SimilarityEngineType, containing similarity engine
* information
* @param modelIdOpt optional modelId, containing the model ID
* @param authorIdOpt optional authorId, containing the tweet author ID
* @param groupIdOpt optional groupId, containing the ID corresponding to the blending group
*/
case class GroupingKey(
sourceInfoOpt: Option[SourceInfo],
similarityEngineTypeOpt: Option[SimilarityEngineType],
modelIdOpt: Option[String],
authorIdOpt: Option[Long],
groupIdOpt: Option[Int])
/**
* Converts candidates to grouping key based upon the feature that we interleave with.
*/
def toGroupingKey[CandidateType <: Candidate](
candidate: CandidateType,
interleaveFeature: Option[BlendGroupingMethodEnum.Value],
groupId: Option[Int],
): GroupingKey = {
val grouping: GroupingKey = candidate match {
case c: RankedCandidate =>
interleaveFeature.getOrElse(BlendGroupingMethodEnum.SourceKeyDefault) match {
case BlendGroupingMethodEnum.SourceKeyDefault =>
GroupingKey(
sourceInfoOpt = c.reasonChosen.sourceInfoOpt,
similarityEngineTypeOpt =
Some(c.reasonChosen.similarityEngineInfo.similarityEngineType),
modelIdOpt = c.reasonChosen.similarityEngineInfo.modelId,
authorIdOpt = None,
groupIdOpt = groupId
)
// Some candidate sources don't have a sourceType, so it defaults to similarityEngine
case BlendGroupingMethodEnum.SourceTypeSimilarityEngine =>
val sourceInfoOpt = c.reasonChosen.sourceInfoOpt.map(_.sourceType).map { sourceType =>
SourceInfo(
sourceType = sourceType,
internalId = InternalId.UserId(0),
sourceEventTime = None)
}
GroupingKey(
sourceInfoOpt = sourceInfoOpt,
similarityEngineTypeOpt =
Some(c.reasonChosen.similarityEngineInfo.similarityEngineType),
modelIdOpt = c.reasonChosen.similarityEngineInfo.modelId,
authorIdOpt = None,
groupIdOpt = groupId
)
case BlendGroupingMethodEnum.AuthorId =>
GroupingKey(
sourceInfoOpt = None,
similarityEngineTypeOpt = None,
modelIdOpt = None,
authorIdOpt = Some(c.tweetInfo.authorId),
groupIdOpt = groupId
)
case _ =>
throw new UnsupportedOperationException(
s"Unsupported interleave feature: $interleaveFeature")
}
case _ =>
GroupingKey(
sourceInfoOpt = None,
similarityEngineTypeOpt = None,
modelIdOpt = None,
authorIdOpt = None,
groupIdOpt = groupId
)
}
grouping
}
/**
* Rather than manually calculating and maintaining the weights to rank with, we instead
* calculate the weights on the fly, based upon the frequencies of the candidates within each
* group. To ensure that diversity of the feature is maintained, we additionally employ a
* 'shrinkage' parameter which enforces more diversity by moving the weights closer to uniformity.
* More details are available at go/weighted-interleave.
*
* @param candidateSeqKeyByFeature candidate to key.
* @param rankerWeightShrinkage value between [0, 1] with 1 being complete uniformity.
* @return Interleaving weights keyed by feature.
*/
private def calculateWeightsKeyByFeature[CandidateType <: Candidate](
candidateSeqKeyByFeature: Map[GroupingKey, Seq[CandidateType]],
rankerWeightShrinkage: Double
): Map[GroupingKey, Double] = {
val maxNumberCandidates: Double = candidateSeqKeyByFeature.values
.map { candidates =>
candidates.size
}.max.toDouble
candidateSeqKeyByFeature.map {
case (featureKey: GroupingKey, candidateSeq: Seq[CandidateType]) =>
val observedWeight: Double = candidateSeq.size.toDouble / maxNumberCandidates
// How much to shrink empirical estimates to 1 (Default is to make all weights 1).
val finalWeight =
(1.0 - rankerWeightShrinkage) * observedWeight + rankerWeightShrinkage * 1.0
featureKey -> finalWeight
}
}
/**
* Builds out the groups and weights for weighted interleaving of the candidates.
* More details are available at go/weighted-interleave.
*
* @param rankedCandidateSeq candidates to interleave.
* @param rankerWeightShrinkage value between [0, 1] with 1 being complete uniformity.
* @return Candidates grouped by feature key and with calculated interleaving weights.
*/
def buildRankedCandidatesWithWeightKeyByFeature(
rankedCandidateSeq: Seq[RankedCandidate],
rankerWeightShrinkage: Double,
interleaveFeature: BlendGroupingMethodEnum.Value
): Seq[(Seq[RankedCandidate], Double)] = {
// To accommodate the re-grouping in InterleaveRanker
// In InterleaveBlender, we have already abandoned the grouping keys, and use Seq[Seq[]] to do interleave
// Since that we build the candidateSeq with groupingKey, we can guarantee there is no empty candidateSeq
val candidateSeqKeyByFeature: Map[GroupingKey, Seq[RankedCandidate]] =
rankedCandidateSeq.groupBy { candidate: RankedCandidate =>
toGroupingKey(candidate, Some(interleaveFeature), None)
}
// These weights [0, 1] are used to do weighted interleaving
// The default value of 1.0 ensures the group is always sampled.
val candidateWeightsKeyByFeature: Map[GroupingKey, Double] =
calculateWeightsKeyByFeature(candidateSeqKeyByFeature, rankerWeightShrinkage)
candidateSeqKeyByFeature.map {
case (groupingKey: GroupingKey, candidateSeq: Seq[RankedCandidate]) =>
Tuple2(
candidateSeq.sortBy(-_.predictionScore),
candidateWeightsKeyByFeature.getOrElse(groupingKey, 1.0))
}.toSeq
}
/**
* Takes current grouping (as implied by the outer Seq) and computes blending weights.
*
* @param initialCandidatesSeqSeq grouped candidates to interleave.
* @param rankerWeightShrinkage value between [0, 1] with 1 being complete uniformity.
* @return Grouped candidates with calculated interleaving weights.
*/
def buildInitialCandidatesWithWeightKeyByFeature(
initialCandidatesSeqSeq: Seq[Seq[InitialCandidate]],
rankerWeightShrinkage: Double,
): Seq[(Seq[InitialCandidate], Double)] = {
val candidateSeqKeyByFeature: Map[GroupingKey, Seq[InitialCandidate]] =
initialCandidatesSeqSeq.zipWithIndex.map(_.swap).toMap.map {
case (groupId: Int, initialCandidatesSeq: Seq[InitialCandidate]) =>
toGroupingKey(initialCandidatesSeq.head, None, Some(groupId)) -> initialCandidatesSeq
}
// These weights [0, 1] are used to do weighted interleaving
// The default value of 1.0 ensures the group is always sampled.
val candidateWeightsKeyByFeature =
calculateWeightsKeyByFeature(candidateSeqKeyByFeature, rankerWeightShrinkage)
candidateSeqKeyByFeature.map {
case (groupingKey: GroupingKey, candidateSeq: Seq[InitialCandidate]) =>
Tuple2(candidateSeq, candidateWeightsKeyByFeature.getOrElse(groupingKey, 1.0))
}.toSeq
}
}

View File

@ -1,130 +0,0 @@
package com.twitter.cr_mixer.util
import com.twitter.search.common.schema.earlybird.EarlybirdFieldConstants.EarlybirdFieldConstant
import com.twitter.search.queryparser.query.search.SearchOperator
import com.twitter.search.queryparser.query.search.SearchOperatorConstants
import com.twitter.search.queryparser.query.{Query => EbQuery}
import com.twitter.search.queryparser.query.Conjunction
import scala.collection.JavaConverters._
import com.twitter.search.earlybird.thriftscala.ThriftSearchResultMetadataOptions
import com.twitter.simclusters_v2.common.TweetId
import com.twitter.search.queryparser.query.Query
import com.twitter.util.Duration
import com.twitter.search.common.query.thriftjava.thriftscala.CollectorTerminationParams
object EarlybirdSearchUtil {
val EarlybirdClientId: String = "cr-mixer.prod"
val Mentions: String = EarlybirdFieldConstant.MENTIONS_FACET
val Hashtags: String = EarlybirdFieldConstant.HASHTAGS_FACET
val FacetsToFetch: Seq[String] = Seq(Mentions, Hashtags)
val MetadataOptions: ThriftSearchResultMetadataOptions = ThriftSearchResultMetadataOptions(
getTweetUrls = true,
getResultLocation = false,
getLuceneScore = false,
getInReplyToStatusId = true,
getReferencedTweetAuthorId = true,
getMediaBits = true,
getAllFeatures = true,
getFromUserId = true,
returnSearchResultFeatures = true,
// Set getExclusiveConversationAuthorId in order to retrieve Exclusive / SuperFollow tweets.
getExclusiveConversationAuthorId = true
)
// Filter out retweets and replies
val TweetTypesToExclude: Seq[String] =
Seq(
SearchOperatorConstants.NATIVE_RETWEETS,
SearchOperatorConstants.REPLIES)
def GetCollectorTerminationParams(
maxNumHitsPerShard: Int,
processingTimeout: Duration
): Option[CollectorTerminationParams] = {
Some(
CollectorTerminationParams(
// maxHitsToProcess is used for early termination on each EB shard
maxHitsToProcess = Some(maxNumHitsPerShard),
timeoutMs = processingTimeout.inMilliseconds.toInt
))
}
/**
* Get EarlybirdQuery
* This function creates a EBQuery based on the search input
*/
def GetEarlybirdQuery(
beforeTweetIdExclusive: Option[TweetId],
afterTweetIdExclusive: Option[TweetId],
excludedTweetIds: Set[TweetId],
filterOutRetweetsAndReplies: Boolean
): Option[EbQuery] =
CreateConjunction(
Seq(
CreateRangeQuery(beforeTweetIdExclusive, afterTweetIdExclusive),
CreateExcludedTweetIdsQuery(excludedTweetIds),
CreateTweetTypesFilters(filterOutRetweetsAndReplies)
).flatten)
def CreateRangeQuery(
beforeTweetIdExclusive: Option[TweetId],
afterTweetIdExclusive: Option[TweetId]
): Option[EbQuery] = {
val beforeIdClause = beforeTweetIdExclusive.map { beforeId =>
// MAX_ID is an inclusive value therefore we subtract 1 from beforeId.
new SearchOperator(SearchOperator.Type.MAX_ID, (beforeId - 1).toString)
}
val afterIdClause = afterTweetIdExclusive.map { afterId =>
new SearchOperator(SearchOperator.Type.SINCE_ID, afterId.toString)
}
CreateConjunction(Seq(beforeIdClause, afterIdClause).flatten)
}
def CreateTweetTypesFilters(filterOutRetweetsAndReplies: Boolean): Option[EbQuery] = {
if (filterOutRetweetsAndReplies) {
val tweetTypeFilters = TweetTypesToExclude.map { searchOperator =>
new SearchOperator(SearchOperator.Type.EXCLUDE, searchOperator)
}
CreateConjunction(tweetTypeFilters)
} else None
}
def CreateConjunction(clauses: Seq[EbQuery]): Option[EbQuery] = {
clauses.size match {
case 0 => None
case 1 => Some(clauses.head)
case _ => Some(new Conjunction(clauses.asJava))
}
}
def CreateExcludedTweetIdsQuery(tweetIds: Set[TweetId]): Option[EbQuery] = {
if (tweetIds.nonEmpty) {
Some(
new SearchOperator.Builder()
.setType(SearchOperator.Type.NAMED_MULTI_TERM_DISJUNCTION)
.addOperand(EarlybirdFieldConstant.ID_FIELD.getFieldName)
.addOperand(EXCLUDE_TWEET_IDS)
.setOccur(Query.Occur.MUST_NOT)
.build())
} else None
}
/**
* Get NamedDisjunctions with excludedTweetIds
*/
def GetNamedDisjunctions(excludedTweetIds: Set[TweetId]): Option[Map[String, Seq[Long]]] =
if (excludedTweetIds.nonEmpty)
createNamedDisjunctionsExcludedTweetIds(excludedTweetIds)
else None
val EXCLUDE_TWEET_IDS = "exclude_tweet_ids"
private def createNamedDisjunctionsExcludedTweetIds(
tweetIds: Set[TweetId]
): Option[Map[String, Seq[Long]]] = {
if (tweetIds.nonEmpty) {
Some(Map(EXCLUDE_TWEET_IDS -> tweetIds.toSeq))
} else None
}
}

View File

@ -1,160 +0,0 @@
package com.twitter.cr_mixer.util
import com.twitter.cr_mixer.model.Candidate
import com.twitter.cr_mixer.model.CandidateGenerationInfo
import com.twitter.cr_mixer.model.RankedCandidate
import com.twitter.cr_mixer.model.SourceInfo
import com.twitter.cr_mixer.thriftscala.SimilarityEngineType
import com.twitter.simclusters_v2.common.TweetId
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
object InterleaveUtil {
/**
* Interleaves candidates by iteratively taking one candidate from the 1st Seq and adding it to the result.
* Once we take a candidate from a Seq, we move this Seq to the end of the queue to process,
* and remove the candidate from that Seq.
*
* We keep a mutable.Set[TweetId] buffer to ensure there are no duplicates.
*
* @param candidates candidates assumed to be sorted by eventTime (latest event comes first)
* @return interleaved candidates
*/
def interleave[CandidateType <: Candidate](
candidates: Seq[Seq[CandidateType]]
): Seq[CandidateType] = {
// copy candidates into a mutable map so this method is thread-safe
val candidatesPerSequence = candidates.map { tweetCandidates =>
mutable.Queue() ++= tweetCandidates
}
val seen = mutable.Set[TweetId]()
val candidateSeqQueue = mutable.Queue() ++= candidatesPerSequence
val result = ArrayBuffer[CandidateType]()
while (candidateSeqQueue.nonEmpty) {
val candidatesQueue = candidateSeqQueue.head
if (candidatesQueue.nonEmpty) {
val candidate = candidatesQueue.dequeue()
val candidateTweetId = candidate.tweetId
val seenCandidate = seen.contains(candidateTweetId)
if (!seenCandidate) {
result += candidate
seen.add(candidate.tweetId)
candidateSeqQueue.enqueue(
candidateSeqQueue.dequeue()
) // move this Seq to end
}
} else {
candidateSeqQueue.dequeue() //finished processing this Seq
}
}
//convert result to immutable seq
result.toList
}
/**
* Interleaves candidates by iteratively
* 1. Checking weight to see if enough accumulation has occurred to sample from
* 2. If yes, taking one candidate from the the Seq and adding it to the result.
* 3. Move this Seq to the end of the queue to process (and remove the candidate from that Seq if
* we sampled it from step 2).
*
* We keep count of the iterations to prevent infinite loops.
* We keep a mutable.Set[TweetId] buffer to ensure there are no duplicates.
*
* @param candidatesAndWeight candidates assumed to be sorted by eventTime (latest event comes first),
* along with sampling weights to help prioritize important groups.
* @param maxWeightAdjustments Maximum number of iterations to account for weighting before
* defaulting to uniform interleaving.
* @return interleaved candidates
*/
def weightedInterleave[CandidateType <: Candidate](
candidatesAndWeight: Seq[(Seq[CandidateType], Double)],
maxWeightAdjustments: Int = 0
): Seq[CandidateType] = {
// Set to avoid numerical issues around 1.0
val min_weight = 1 - 1e-30
// copy candidates into a mutable map so this method is thread-safe
// adds a counter to use towards sampling
val candidatesAndWeightsPerSequence: Seq[
(mutable.Queue[CandidateType], InterleaveWeights)
] =
candidatesAndWeight.map { candidatesAndWeight =>
(mutable.Queue() ++= candidatesAndWeight._1, InterleaveWeights(candidatesAndWeight._2, 0.0))
}
val seen: mutable.Set[TweetId] = mutable.Set[TweetId]()
val candidateSeqQueue: mutable.Queue[(mutable.Queue[CandidateType], InterleaveWeights)] =
mutable.Queue() ++= candidatesAndWeightsPerSequence
val result: ArrayBuffer[CandidateType] = ArrayBuffer[CandidateType]()
var number_iterations: Int = 0
while (candidateSeqQueue.nonEmpty) {
val (candidatesQueue, currentWeights) = candidateSeqQueue.head
if (candidatesQueue.nonEmpty) {
// Confirm weighting scheme
currentWeights.summed_weight += currentWeights.weight
number_iterations += 1
if (currentWeights.summed_weight >= min_weight || number_iterations >= maxWeightAdjustments) {
// If we sample, then adjust the counter
currentWeights.summed_weight -= 1.0
val candidate = candidatesQueue.dequeue()
val candidateTweetId = candidate.tweetId
val seenCandidate = seen.contains(candidateTweetId)
if (!seenCandidate) {
result += candidate
seen.add(candidate.tweetId)
candidateSeqQueue.enqueue(candidateSeqQueue.dequeue()) // move this Seq to end
}
} else {
candidateSeqQueue.enqueue(candidateSeqQueue.dequeue()) // move this Seq to end
}
} else {
candidateSeqQueue.dequeue() //finished processing this Seq
}
}
//convert result to immutable seq
result.toList
}
def buildCandidatesKeyByCGInfo(
candidates: Seq[RankedCandidate],
): Seq[Seq[RankedCandidate]] = {
// To accommodate the re-grouping in InterleaveRanker
// In InterleaveBlender, we have already abandoned the grouping keys, and use Seq[Seq[]] to do interleave
// Since that we build the candidateSeq with groupingKey, we can guarantee there is no empty candidateSeq
val candidateSeqKeyByCG =
candidates.groupBy(candidate => GroupingKey.toGroupingKey(candidate.reasonChosen))
candidateSeqKeyByCG.map {
case (groupingKey, candidateSeq) =>
candidateSeq.sortBy(-_.predictionScore)
}.toSeq
}
}
case class GroupingKey(
sourceInfoOpt: Option[SourceInfo],
similarityEngineType: SimilarityEngineType,
modelId: Option[String]) {}
object GroupingKey {
def toGroupingKey(candidateGenerationInfo: CandidateGenerationInfo): GroupingKey = {
GroupingKey(
candidateGenerationInfo.sourceInfoOpt,
candidateGenerationInfo.similarityEngineInfo.similarityEngineType,
candidateGenerationInfo.similarityEngineInfo.modelId
)
}
}
case class InterleaveWeights(weight: Double, var summed_weight: Double)

View File

@ -1,135 +0,0 @@
package com.twitter.cr_mixer.util
import com.twitter.cr_mixer.model.RankedCandidate
import com.twitter.cr_mixer.model.SimilarityEngineInfo
import com.twitter.cr_mixer.model.SourceInfo
import com.twitter.cr_mixer.thriftscala.MetricTag
import com.twitter.cr_mixer.thriftscala.SimilarityEngineType
import com.twitter.cr_mixer.thriftscala.SourceType
object MetricTagUtil {
def buildMetricTags(candidate: RankedCandidate): Seq[MetricTag] = {
val interestedInMetricTag = isFromInterestedIn(candidate)
val cgInfoMetricTags = candidate.potentialReasons
.flatMap { cgInfo =>
val sourceMetricTag = cgInfo.sourceInfoOpt.flatMap { sourceInfo =>
toMetricTagFromSource(sourceInfo.sourceType)
}
val similarityEngineTags = toMetricTagFromSimilarityEngine(
cgInfo.similarityEngineInfo,
cgInfo.contributingSimilarityEngines)
val combinedMetricTag = cgInfo.sourceInfoOpt.flatMap { sourceInfo =>
toMetricTagFromSourceAndSimilarityEngine(sourceInfo, cgInfo.similarityEngineInfo)
}
Seq(sourceMetricTag) ++ similarityEngineTags ++ Seq(combinedMetricTag)
}.flatten.toSet
(interestedInMetricTag ++ cgInfoMetricTags).toSeq
}
/***
* match a sourceType to a metricTag
*/
private def toMetricTagFromSource(sourceType: SourceType): Option[MetricTag] = {
sourceType match {
case SourceType.TweetFavorite => Some(MetricTag.TweetFavorite) // Personalized Topics in Home
case SourceType.Retweet => Some(MetricTag.Retweet) // Personalized Topics in Home
case SourceType.NotificationClick =>
Some(MetricTag.PushOpenOrNtabClick) // Health Filter in MR
case SourceType.OriginalTweet =>
Some(MetricTag.OriginalTweet)
case SourceType.Reply =>
Some(MetricTag.Reply)
case SourceType.TweetShare =>
Some(MetricTag.TweetShare)
case SourceType.UserFollow =>
Some(MetricTag.UserFollow)
case SourceType.UserRepeatedProfileVisit =>
Some(MetricTag.UserRepeatedProfileVisit)
case SourceType.TwiceUserId =>
Some(MetricTag.TwiceUserId)
case _ => None
}
}
/***
* If the SEInfo is built by a unified sim engine, we un-wrap the contributing sim engines.
* If not, we log the sim engine as usual.
* @param seInfo (CandidateGenerationInfo.similarityEngineInfo): SimilarityEngineInfo,
* @param cseInfo (CandidateGenerationInfo.contributingSimilarityEngines): Seq[SimilarityEngineInfo]
*/
private def toMetricTagFromSimilarityEngine(
seInfo: SimilarityEngineInfo,
cseInfo: Seq[SimilarityEngineInfo]
): Seq[Option[MetricTag]] = {
seInfo.similarityEngineType match {
case SimilarityEngineType.TweetBasedUnifiedSimilarityEngine => // un-wrap the unified sim engine
cseInfo.map { contributingSimEngine =>
toMetricTagFromSimilarityEngine(contributingSimEngine, Seq.empty)
}.flatten
case SimilarityEngineType.ProducerBasedUnifiedSimilarityEngine => // un-wrap the unified sim engine
cseInfo.map { contributingSimEngine =>
toMetricTagFromSimilarityEngine(contributingSimEngine, Seq.empty)
}.flatten
// SimClustersANN can either be called on its own, or be called under unified sim engine
case SimilarityEngineType.SimClustersANN => // the old "UserInterestedIn" will be replaced by this. Also, OfflineTwice
Seq(Some(MetricTag.SimClustersANN), seInfo.modelId.flatMap(toMetricTagFromModelId(_)))
case SimilarityEngineType.ConsumerEmbeddingBasedTwHINANN =>
Seq(Some(MetricTag.ConsumerEmbeddingBasedTwHINANN))
case SimilarityEngineType.TwhinCollabFilter => Seq(Some(MetricTag.TwhinCollabFilter))
// In the current implementation, TweetBasedUserTweetGraph/TweetBasedTwHINANN has a tag when
// it's either a base SE or a contributing SE. But for now they only show up in contributing SE.
case SimilarityEngineType.TweetBasedUserTweetGraph =>
Seq(Some(MetricTag.TweetBasedUserTweetGraph))
case SimilarityEngineType.TweetBasedTwHINANN =>
Seq(Some(MetricTag.TweetBasedTwHINANN))
case _ => Seq.empty
}
}
/***
* pass in a model id, and match it with the metric tag type.
*/
private def toMetricTagFromModelId(
modelId: String
): Option[MetricTag] = {
val pushOpenBasedModelRegex = "(.*_Model20m145k2020_20220819)".r
modelId match {
case pushOpenBasedModelRegex(_*) =>
Some(MetricTag.RequestHealthFilterPushOpenBasedTweetEmbedding)
case _ => None
}
}
private def toMetricTagFromSourceAndSimilarityEngine(
sourceInfo: SourceInfo,
seInfo: SimilarityEngineInfo
): Option[MetricTag] = {
sourceInfo.sourceType match {
case SourceType.Lookalike
if seInfo.similarityEngineType == SimilarityEngineType.ConsumersBasedUserTweetGraph =>
Some(MetricTag.LookalikeUTG)
case _ => None
}
}
/**
* Special use case: used by Notifications team to generate the UserInterestedIn CRT push copy.
*
* if we have different types of InterestedIn (eg. UserInterestedIn, NextInterestedIn),
* this if statement will have to be refactored to contain the real UserInterestedIn.
* @return
*/
private def isFromInterestedIn(candidate: RankedCandidate): Set[MetricTag] = {
if (candidate.reasonChosen.sourceInfoOpt.isEmpty
&& candidate.reasonChosen.similarityEngineInfo.similarityEngineType == SimilarityEngineType.SimClustersANN) {
Set(MetricTag.UserInterestedIn)
} else Set.empty
}
}

View File

@ -1,66 +0,0 @@
package com.twitter.cr_mixer.util
import com.twitter.cr_mixer.model.CandidateGenerationInfo
import com.twitter.cr_mixer.model.RankedCandidate
import com.twitter.cr_mixer.model.SourceInfo
import com.twitter.cr_mixer.thriftscala.SourceType
import com.twitter.cr_mixer.thriftscala.TweetRecommendation
import javax.inject.Inject
import com.twitter.finagle.stats.StatsReceiver
import javax.inject.Singleton
import com.twitter.relevance_platform.common.stats.BucketTimestampStats
@Singleton
class SignalTimestampStatsUtil @Inject() (statsReceiver: StatsReceiver) {
import SignalTimestampStatsUtil._
private val signalDelayAgePerDayStats =
new BucketTimestampStats[TweetRecommendation](
BucketTimestampStats.MillisecondsPerDay,
_.latestSourceSignalTimestampInMillis.getOrElse(0),
Some(SignalTimestampMaxDays))(
statsReceiver.scope("signal_timestamp_per_day")
) // only stats past 90 days
private val signalDelayAgePerHourStats =
new BucketTimestampStats[TweetRecommendation](
BucketTimestampStats.MillisecondsPerHour,
_.latestSourceSignalTimestampInMillis.getOrElse(0),
Some(SignalTimestampMaxHours))(
statsReceiver.scope("signal_timestamp_per_hour")
) // only stats past 24 hours
private val signalDelayAgePerMinStats =
new BucketTimestampStats[TweetRecommendation](
BucketTimestampStats.MillisecondsPerMinute,
_.latestSourceSignalTimestampInMillis.getOrElse(0),
Some(SignalTimestampMaxMins))(
statsReceiver.scope("signal_timestamp_per_min")
) // only stats past 60 minutes
def statsSignalTimestamp(
tweets: Seq[TweetRecommendation],
): Seq[TweetRecommendation] = {
signalDelayAgePerMinStats.count(tweets)
signalDelayAgePerHourStats.count(tweets)
signalDelayAgePerDayStats.count(tweets)
}
}
object SignalTimestampStatsUtil {
val SignalTimestampMaxMins = 60 // stats at most 60 mins
val SignalTimestampMaxHours = 24 // stats at most 24 hours
val SignalTimestampMaxDays = 90 // stats at most 90 days
def buildLatestSourceSignalTimestamp(candidate: RankedCandidate): Option[Long] = {
val timestampSeq = candidate.potentialReasons
.collect {
case CandidateGenerationInfo(Some(SourceInfo(sourceType, _, Some(sourceEventTime))), _, _)
if sourceType == SourceType.TweetFavorite =>
sourceEventTime.inMilliseconds
}
if (timestampSeq.nonEmpty) {
Some(timestampSeq.max(Ordering.Long))
} else {
None
}
}
}

View File

@ -1,48 +0,0 @@
create_thrift_libraries(
base_name = "thrift",
sources = ["*.thrift"],
platform = "java8",
tags = ["bazel-compatible"],
dependency_roots = [
"finatra-internal/thrift/src/main/thrift",
"product-mixer/core/src/main/thrift/com/twitter/product_mixer/core:thrift",
"src/thrift/com/twitter/ads/schema:common",
"src/thrift/com/twitter/ml/api:data",
"src/thrift/com/twitter/recos:recos-common",
"src/thrift/com/twitter/simclusters_v2:simclusters_v2-thrift",
"src/thrift/com/twitter/timelines/render:thrift",
"strato/config/src/thrift/com/twitter/strato/graphql",
"strato/config/src/thrift/com/twitter/strato/graphql:api-media-graphql",
"strato/config/src/thrift/com/twitter/strato/graphql:topics-graphql",
],
generate_languages = [
"java",
"scala",
"strato",
],
provides_java_name = "cr-mixer-thrift-java",
provides_scala_name = "cr-mixer-thrift-scala",
)
create_thrift_libraries(
base_name = "cr-mixer-scribe",
sources = ["*.thrift"],
tags = ["bazel-compatible"],
dependency_roots = [
"finatra-internal/thrift/src/main/thrift",
"product-mixer/core/src/main/thrift/com/twitter/product_mixer/core:thrift",
"src/thrift/com/twitter/ads/schema:common",
"src/thrift/com/twitter/ml/api:data",
"src/thrift/com/twitter/recos:recos-common",
"src/thrift/com/twitter/simclusters_v2:simclusters_v2-thrift",
"src/thrift/com/twitter/timelines/render:thrift",
"strato/config/src/thrift/com/twitter/strato/graphql",
],
generate_languages = [
"java",
"scala",
"strato",
],
provides_java_name = "cr-mixer-scribe-java",
provides_scala_name = "cr-mixer-scribe-scala",
)

Binary file not shown.

Binary file not shown.

View File

@ -1,33 +0,0 @@
namespace java com.twitter.cr_mixer.thriftjava
#@namespace scala com.twitter.cr_mixer.thriftscala
#@namespace strato com.twitter.cr_mixer
include "product.thrift"
include "product_context.thrift"
include "com/twitter/product_mixer/core/client_context.thrift"
include "com/twitter/ads/schema/shared.thrift"
struct AdsRequest {
1: required client_context.ClientContext clientContext
2: required product.Product product
# Product-specific parameters should be placed in the Product Context
3: optional product_context.ProductContext productContext
4: optional list<i64> excludedTweetIds (personalDataType = 'TweetId')
} (persisted='true', hasPersonalData='true')
struct AdsResponse {
1: required list<AdTweetRecommendation> ads
} (persisted='true')
struct AdTweetRecommendation {
1: required i64 tweetId (personalDataType = 'TweetId')
2: required double score
3: optional list<LineItemInfo> lineItems
} (persisted='true')
struct LineItemInfo {
1: required i64 lineItemId (personalDataType = 'LineItemId')
2: required shared.LineItemObjective lineItemObjective
} (persisted='true')

View File

@ -1,21 +0,0 @@
namespace java com.twitter.cr_mixer.thriftjava
#@namespace scala com.twitter.cr_mixer.thriftscala
#@namespace strato com.twitter.cr_mixer
include "source_type.thrift"
include "com/twitter/simclusters_v2/identifier.thrift"
struct SimilarityEngine {
1: required source_type.SimilarityEngineType similarityEngineType
2: optional string modelId
3: optional double score
} (persisted='true')
struct CandidateGenerationKey {
1: required source_type.SourceType sourceType
2: required i64 sourceEventTime (personalDataType = 'PrivateTimestamp')
3: required identifier.InternalId id
4: required string modelId
5: optional source_type.SimilarityEngineType similarityEngineType
6: optional list<SimilarityEngine> contributingSimilarityEngine
} (persisted='true')

Binary file not shown.

View File

@ -1,104 +0,0 @@
namespace java com.twitter.cr_mixer.thriftjava
#@namespace scala com.twitter.cr_mixer.thriftscala
#@namespace strato com.twitter.cr_mixer
include "ads.thrift"
include "candidate_generation_key.thrift"
include "product.thrift"
include "product_context.thrift"
include "validation.thrift"
include "metric_tags.thrift"
include "related_tweet.thrift"
include "uteg.thrift"
include "frs_based_tweet.thrift"
include "related_video_tweet.thrift"
include "topic_tweet.thrift"
include "com/twitter/product_mixer/core/client_context.thrift"
include "com/twitter/timelines/render/response.thrift"
include "finatra-thrift/finatra_thrift_exceptions.thrift"
include "com/twitter/strato/graphql/slice.thrift"
struct CrMixerTweetRequest {
1: required client_context.ClientContext clientContext
2: required product.Product product
# Product-specific parameters should be placed in the Product Context
3: optional product_context.ProductContext productContext
4: optional list<i64> excludedTweetIds (personalDataType = 'TweetId')
} (persisted='true', hasPersonalData='true')
struct TweetRecommendation {
1: required i64 tweetId (personalDataType = 'TweetId')
2: required double score
3: optional list<metric_tags.MetricTag> metricTags
# 4: the author of the tweet candidate. To be used by Content-Mixer to unblock the Hydra experiment.
4: optional i64 authorId (personalDataType = 'UserId')
# 5: extra info about candidate generation. To be used by Content-Mixer to unblock the Hydra experiment.
5: optional candidate_generation_key.CandidateGenerationKey candidateGenerationKey
# 1001: the latest timestamp of fav signals. If null, the candidate is not generated from fav signals
1001: optional i64 latestSourceSignalTimestampInMillis(personalDataType = 'PublicTimestamp')
} (persisted='true', hasPersonalData = 'true')
struct CrMixerTweetResponse {
1: required list<TweetRecommendation> tweets
} (persisted='true')
service CrMixer {
CrMixerTweetResponse getTweetRecommendations(1: CrMixerTweetRequest request) throws (
# Validation errors - the details of which will be reported to clients on failure
1: validation.ValidationExceptionList validationErrors;
# Server errors - the details of which will not be reported to clients
2: finatra_thrift_exceptions.ServerError serverError
)
# getRelatedTweetsForQueryTweet and getRelatedTweetsForQueryAuthor do very similar things
# We can merge these two endpoints into one unified endpoint
related_tweet.RelatedTweetResponse getRelatedTweetsForQueryTweet(1: related_tweet.RelatedTweetRequest request) throws (
# Validation errors - the details of which will be reported to clients on failure
1: validation.ValidationExceptionList validationErrors;
# Server errors - the details of which will not be reported to clients
2: finatra_thrift_exceptions.ServerError serverError
)
related_tweet.RelatedTweetResponse getRelatedTweetsForQueryAuthor(1: related_tweet.RelatedTweetRequest request) throws (
# Validation errors - the details of which will be reported to clients on failure
1: validation.ValidationExceptionList validationErrors;
# Server errors - the details of which will not be reported to clients
2: finatra_thrift_exceptions.ServerError serverError
)
uteg.UtegTweetResponse getUtegTweetRecommendations(1: uteg.UtegTweetRequest request) throws (
# Validation errors - the details of which will be reported to clients on failure
1: validation.ValidationExceptionList validationErrors;
# Server errors - the details of which will not be reported to clients
2: finatra_thrift_exceptions.ServerError serverError
)
frs_based_tweet.FrsTweetResponse getFrsBasedTweetRecommendations(1: frs_based_tweet.FrsTweetRequest request) throws (
# Validation errors - the details of which will be reported to clients on failure
1: validation.ValidationExceptionList validationErrors;
# Server errors - the details of which will not be reported to clients
2: finatra_thrift_exceptions.ServerError serverError
)
related_video_tweet.RelatedVideoTweetResponse getRelatedVideoTweetsForQueryTweet(1: related_video_tweet.RelatedVideoTweetRequest request) throws (
# Validation errors - the details of which will be reported to clients on failure
1: validation.ValidationExceptionList validationErrors;
# Server errors - the details of which will not be reported to clients
2: finatra_thrift_exceptions.ServerError serverError
)
ads.AdsResponse getAdsRecommendations(1: ads.AdsRequest request) throws (
# Validation errors - the details of which will be reported to clients on failure
1: validation.ValidationExceptionList validationErrors;
# Server errors - the details of which will not be reported to clients
2: finatra_thrift_exceptions.ServerError serverError
)
topic_tweet.TopicTweetResponse getTopicTweetRecommendations(1: topic_tweet.TopicTweetRequest request) throws (
# Validation errors - the details of which will be reported to clients on failure
1: validation.ValidationExceptionList validationErrors;
# Server errors - the details of which will not be reported to clients
2: finatra_thrift_exceptions.ServerError serverError
)
}

Binary file not shown.

View File

@ -1,35 +0,0 @@
namespace java com.twitter.cr_mixer.thriftjava
#@namespace scala com.twitter.cr_mixer.thriftscala
#@namespace strato com.twitter.cr_mixer
include "product.thrift"
include "product_context.thrift"
include "com/twitter/product_mixer/core/client_context.thrift"
struct FrsTweetRequest {
1: required client_context.ClientContext clientContext
2: required product.Product product
3: optional product_context.ProductContext productContext
# excludedUserIds - user ids to be excluded from FRS candidate generation
4: optional list<i64> excludedUserIds (personalDataType = 'UserId')
# excludedTweetIds - tweet ids to be excluded from Earlybird candidate generation
5: optional list<i64> excludedTweetIds (personalDataType = 'TweetId')
} (persisted='true', hasPersonalData='true')
struct FrsTweet {
1: required i64 tweetId (personalDataType = 'TweetId')
2: required i64 authorId (personalDataType = 'UserId')
# skip 3 in case we need tweet score in the future
# frsPrimarySource - which FRS candidate source is the primary one to generate this author
4: optional i32 frsPrimarySource
# frsCandidateSourceScores - FRS candidate sources and the scores for this author
# for i32 to algorithm mapping, see https://sourcegraph.twitter.biz/git.twitter.biz/source/-/blob/hermit/hermit-core/src/main/scala/com/twitter/hermit/constants/AlgorithmFeedbackTokens.scala?L12
5: optional map<i32, double> frsCandidateSourceScores
# frsPrimaryScore - the score of the FRS primary candidate source
6: optional double frsAuthorScore
} (persisted='true', hasPersonalData = 'true')
struct FrsTweetResponse {
1: required list<FrsTweet> tweets
} (persisted='true')

Binary file not shown.

View File

@ -1,44 +0,0 @@
namespace java com.twitter.cr_mixer.thriftjava
#@namespace scala com.twitter.cr_mixer.thriftscala
#@namespace strato com.twitter.cr_mixer
// NOTE: DO NOT depend on MetricTags for important ML Features or business logic.
// MetricTags are meant for stats tracking & debugging purposes ONLY.
// cr-mixer may change its definitions & how each candidate is tagged without public notice.
// NOTE: TSPS needs the caller (Home) to specify which signal it uses to make Personalized Topics
enum MetricTag {
// Source Signal Tags
TweetFavorite = 0
Retweet = 1
TrafficAttribution = 2
OriginalTweet = 3
Reply = 4
TweetShare = 5
UserFollow = 101
UserRepeatedProfileVisit = 102
PushOpenOrNtabClick = 201
HomeTweetClick = 301
HomeVideoView = 302
// sim engine types
SimClustersANN = 401
TweetBasedUserTweetGraph = 402
TweetBasedTwHINANN = 403
ConsumerEmbeddingBasedTwHINANN = 404
// combined engine types
UserInterestedIn = 501 // Will deprecate soon
LookalikeUTG = 502
TwhinCollabFilter = 503
// Offline Twice
TwiceUserId = 601
// Other Metric Tags
RequestHealthFilterPushOpenBasedTweetEmbedding = 701
} (persisted='true', hasPersonalData='true')

Binary file not shown.

View File

@ -1,19 +0,0 @@
namespace java com.twitter.cr_mixer.thriftjava
#@namespace scala com.twitter.cr_mixer.thriftscala
#@namespace strato com.twitter.cr_mixer
# In CrMixer, one org should only have one Product
enum Product {
Home = 1
Notifications = 2
Email = 3
MoreTweetsModule = 4 # aka RUX
ImmersiveMediaViewer = 5
VideoCarousel = 6
ExploreTopics = 7
Ads = 8
HomeRealTime = 9 // Home Real-Time Tab is considered as a different Product surface to Home Tab. It's in early experiment phase.
TopicLandingPage = 10
HomeTopicsBackfill = 11
TopicTweetsStrato = 12
}

Binary file not shown.

View File

@ -1,21 +0,0 @@
namespace java com.twitter.cr_mixer.thriftjava
#@namespace scala com.twitter.cr_mixer.thriftscala
#@namespace strato com.twitter.cr_mixer
struct HomeContext {
2: optional i32 maxResults // enabled for QuaityFactor related DDGs only
} (persisted='true', hasPersonalData='false')
struct NotificationsContext {
1: optional i32 devNull // not being used. it's a placeholder
} (persisted='true', hasPersonalData='false')
struct ExploreContext {
1: required bool isVideoOnly
} (persisted='true', hasPersonalData='false')
union ProductContext {
1: HomeContext homeContext
2: NotificationsContext notificationsContext
3: ExploreContext exploreContext
} (persisted='true', hasPersonalData='false')

Binary file not shown.

View File

@ -1,24 +0,0 @@
namespace java com.twitter.cr_mixer.thriftjava
#@namespace scala com.twitter.cr_mixer.thriftscala
#@namespace strato com.twitter.cr_mixer
include "product.thrift"
include "com/twitter/product_mixer/core/client_context.thrift"
include "com/twitter/simclusters_v2/identifier.thrift"
struct RelatedTweetRequest {
1: required identifier.InternalId internalId
2: required product.Product product
3: required client_context.ClientContext clientContext # RUX LogOut will have clientContext.userId = None
4: optional list<i64> excludedTweetIds (personalDataType = 'TweetId')
} (persisted='true', hasPersonalData='true')
struct RelatedTweet {
1: required i64 tweetId (personalDataType = 'TweetId')
2: optional double score
3: optional i64 authorId (personalDataType = 'UserId')
} (persisted='true', hasPersonalData='true')
struct RelatedTweetResponse {
1: required list<RelatedTweet> tweets
} (persisted='true')

View File

@ -1,23 +0,0 @@
namespace java com.twitter.cr_mixer.thriftjava
#@namespace scala com.twitter.cr_mixer.thriftscala
#@namespace strato com.twitter.cr_mixer
include "product.thrift"
include "com/twitter/product_mixer/core/client_context.thrift"
include "com/twitter/simclusters_v2/identifier.thrift"
struct RelatedVideoTweetRequest {
1: required identifier.InternalId internalId
2: required product.Product product
3: required client_context.ClientContext clientContext # RUX LogOut will have clientContext.userId = None
4: optional list<i64> excludedTweetIds (personalDataType = 'TweetId')
} (persisted='true', hasPersonalData='true')
struct RelatedVideoTweet {
1: required i64 tweetId (personalDataType = 'TweetId')
2: optional double score
} (persisted='true', hasPersonalData='true')
struct RelatedVideoTweetResponse {
1: required list<RelatedVideoTweet> tweets
} (persisted='true')

Binary file not shown.

View File

@ -1,168 +0,0 @@
namespace java com.twitter.cr_mixer.thriftjava
#@namespace scala com.twitter.cr_mixer.thriftscala
#@namespace strato com.twitter.cr_mixer
include "ads.thrift"
include "candidate_generation_key.thrift"
include "cr_mixer.thrift"
include "metric_tags.thrift"
include "product.thrift"
include "related_tweet.thrift"
include "source_type.thrift"
include "uteg.thrift"
include "com/twitter/ml/api/data.thrift"
include "com/twitter/simclusters_v2/identifier.thrift"
struct VITTweetCandidatesScribe {
1: required i64 uuid (personalDataType = 'UniversallyUniqueIdentifierUuid') # RequestUUID - unique scribe id for every request that comes in. Same request but different stages of scribe log (FetchCandidate, Filter, etc) share the same uuid
2: required i64 userId (personalDataType = 'UserId')
3: required list<VITTweetCandidateScribe> candidates
7: required product.Product product
8: required list<ImpressesedBucketInfo> impressedBuckets
} (persisted='true', hasPersonalData = 'true')
struct VITTweetCandidateScribe {
1: required i64 tweetId (personalDataType = 'TweetId')
2: required i64 authorId (personalDataType = 'UserId')
3: required double score
4: required list<metric_tags.MetricTag> metricTags
} (persisted='true', hasPersonalData = 'true')
struct GetTweetsRecommendationsScribe {
1: required i64 uuid (personalDataType = 'UniversallyUniqueIdentifierUuid') # RequestUUID - unique scribe id for every request that comes in. Same request but different stages of scribe log (FetchCandidate, Filter, etc) share the same uuid
2: required i64 userId (personalDataType = 'UserId')
3: required Result result
4: optional i64 traceId
5: optional PerformanceMetrics performanceMetrics
6: optional list<ImpressesedBucketInfo> impressedBuckets
} (persisted='true', hasPersonalData = 'true')
struct SourceSignal {
# optional, since that the next step covers all info here
1: optional identifier.InternalId id
} (persisted='true')
struct PerformanceMetrics {
1: optional i64 latencyMs
} (persisted='true')
struct TweetCandidateWithMetadata {
1: required i64 tweetId (personalDataType = 'TweetId')
2: optional candidate_generation_key.CandidateGenerationKey candidateGenerationKey
3: optional i64 authorId (personalDataType = 'UserId') # only for InterleaveResult for hydrating training data
4: optional double score # score with respect to candidateGenerationKey
5: optional data.DataRecord dataRecord # attach any features to this candidate
6: optional i32 numCandidateGenerationKeys # num CandidateGenerationKeys generating this tweetId
} (persisted='true')
struct FetchSignalSourcesResult {
1: optional set<SourceSignal> signals
} (persisted='true')
struct FetchCandidatesResult {
1: optional list<TweetCandidateWithMetadata> tweets
} (persisted='true')
struct PreRankFilterResult {
1: optional list<TweetCandidateWithMetadata> tweets
} (persisted='true')
struct InterleaveResult {
1: optional list<TweetCandidateWithMetadata> tweets
} (persisted='true')
struct RankResult {
1: optional list<TweetCandidateWithMetadata> tweets
} (persisted='true')
struct TopLevelApiResult {
1: required i64 timestamp (personalDataType = 'PrivateTimestamp')
2: required cr_mixer.CrMixerTweetRequest request
3: required cr_mixer.CrMixerTweetResponse response
} (persisted='true')
union Result {
1: FetchSignalSourcesResult fetchSignalSourcesResult
2: FetchCandidatesResult fetchCandidatesResult
3: PreRankFilterResult preRankFilterResult
4: InterleaveResult interleaveResult
5: RankResult rankResult
6: TopLevelApiResult topLevelApiResult
} (persisted='true', hasPersonalData = 'true')
struct ImpressesedBucketInfo {
1: required i64 experimentId (personalDataType = 'ExperimentId')
2: required string bucketName
3: required i32 version
} (persisted='true')
############# RelatedTweets Scribe #############
struct GetRelatedTweetsScribe {
1: required i64 uuid (personalDataType = 'UniversallyUniqueIdentifierUuid') # RequestUUID - unique scribe id for every request that comes in. Same request but different stages of scribe log (FetchCandidate, Filter, etc) share the same uuid
2: required identifier.InternalId internalId
3: required RelatedTweetResult relatedTweetResult
4: optional i64 requesterId (personalDataType = 'UserId')
5: optional i64 guestId (personalDataType = 'GuestId')
6: optional i64 traceId
7: optional PerformanceMetrics performanceMetrics
8: optional list<ImpressesedBucketInfo> impressedBuckets
} (persisted='true', hasPersonalData = 'true')
struct RelatedTweetTopLevelApiResult {
1: required i64 timestamp (personalDataType = 'PrivateTimestamp')
2: required related_tweet.RelatedTweetRequest request
3: required related_tweet.RelatedTweetResponse response
} (persisted='true')
union RelatedTweetResult {
1: RelatedTweetTopLevelApiResult relatedTweetTopLevelApiResult
2: FetchCandidatesResult fetchCandidatesResult
3: PreRankFilterResult preRankFilterResult # results after seqential filters
# if later we need rankResult, we can add it here
} (persisted='true', hasPersonalData = 'true')
############# UtegTweets Scribe #############
struct GetUtegTweetsScribe {
1: required i64 uuid (personalDataType = 'UniversallyUniqueIdentifierUuid') # RequestUUID - unique scribe id for every request that comes in. Same request but different stages of scribe log (FetchCandidate, Filter, etc) share the same uuid
2: required i64 userId (personalDataType = 'UserId')
3: required UtegTweetResult utegTweetResult
4: optional i64 traceId
5: optional PerformanceMetrics performanceMetrics
6: optional list<ImpressesedBucketInfo> impressedBuckets
} (persisted='true', hasPersonalData = 'true')
struct UtegTweetTopLevelApiResult {
1: required i64 timestamp (personalDataType = 'PrivateTimestamp')
2: required uteg.UtegTweetRequest request
3: required uteg.UtegTweetResponse response
} (persisted='true')
union UtegTweetResult {
1: UtegTweetTopLevelApiResult utegTweetTopLevelApiResult
2: FetchCandidatesResult fetchCandidatesResult
# if later we need rankResult, we can add it here
} (persisted='true', hasPersonalData = 'true')
############# getAdsRecommendations() Scribe #############
struct GetAdsRecommendationsScribe {
1: required i64 uuid (personalDataType = 'UniversallyUniqueIdentifierUuid') # RequestUUID - unique scribe id for every request that comes in. Same request but different stages of scribe log (FetchCandidate, Filter, etc) share the same uuid
2: required i64 userId (personalDataType = 'UserId')
3: required AdsRecommendationsResult result
4: optional i64 traceId
5: optional PerformanceMetrics performanceMetrics
6: optional list<ImpressesedBucketInfo> impressedBuckets
} (persisted='true', hasPersonalData = 'true')
struct AdsRecommendationTopLevelApiResult {
1: required i64 timestamp (personalDataType = 'PrivateTimestamp')
2: required ads.AdsRequest request
3: required ads.AdsResponse response
} (persisted='true')
union AdsRecommendationsResult{
1: AdsRecommendationTopLevelApiResult adsRecommendationTopLevelApiResult
2: FetchCandidatesResult fetchCandidatesResult
}(persisted='true', hasPersonalData = 'true')

Binary file not shown.

View File

@ -1,123 +0,0 @@
namespace java com.twitter.cr_mixer.thriftjava
#@namespace scala com.twitter.cr_mixer.thriftscala
#@namespace strato com.twitter.cr_mixer
// Due to legacy reason, SourceType used to represent both SourceSignalType and SimilarityEngineType
// Hence, you can see several SourceType such as UserInterestedIn, HashSpace, etc.
// Moving forward, SourceType will be used for SourceSignalType ONLY. eg., TweetFavorite, UserFollow
// We will create a new SimilarityEngineType to separate them. eg., SimClustersANN
enum SourceType {
// Tweet based Source Signal
TweetFavorite = 0
Retweet = 1
TrafficAttribution = 2 // Traffic Attribution will be migrated over in Q3
OriginalTweet = 3
Reply = 4
TweetShare = 5
GoodTweetClick = 6 // total dwell time > N seconds after click on the tweet
VideoTweetQualityView = 7
VideoTweetPlayback50 = 8
// UserId based Source Signal (includes both Producer/Consumer)
UserFollow = 101
UserRepeatedProfileVisit = 102
CurrentUser_DEPRECATED = 103
RealGraphOon = 104
FollowRecommendation = 105
TwiceUserId = 106
UserTrafficAttributionProfileVisit = 107
GoodProfileClick = 108 // total dwell time > N seconds after click into the profile page
// (Notification) Tweet based Source Signal
NotificationClick = 201
// (Home) Tweet based Source Signal
HomeTweetClick = 301
HomeVideoView = 302
HomeSongbirdShowMore = 303
// Topic based Source Signal
TopicFollow = 401 // Deprecated
PopularTopic = 402 // Deprecated
// Old CR code
UserInterestedIn = 501 // Deprecated
TwiceInterestedIn = 502 // Deprecated
MBCG = 503 // Deprecated
HashSpace = 504 // Deprecated
// Old CR code
Cluster = 601 // Deprecated
// Search based Source Signal
SearchProfileClick = 701 // Deprecated
SearchTweetClick = 702 // Deprecated
// Graph based Source
StrongTiePrediction = 801 // STP
TwiceClustersMembers = 802
Lookalike = 803 // Deprecated
RealGraphIn = 804
// Current requester User Id. It is only used for scribing. Placeholder value
RequestUserId = 1001
// Current request Tweet Id used in RelatedTweet. Placeholder value
RequestTweetId = 1002
// Negative Signals
TweetReport = 1101
TweetDontLike = 1102
TweetSeeFewer = 1103
AccountBlock = 1104
AccountMute = 1105
// Aggregated Signals
TweetAggregation = 1201
ProducerAggregation = 1202
} (persisted='true', hasPersonalData='true')
enum SimilarityEngineType {
SimClustersANN = 1
TweetBasedUserTweetGraph = 2
TweetBasedTwHINANN = 3
Follow2VecANN = 4 // ConsumerEmbeddingBasedFollow2Vec
QIG = 5
OfflineSimClustersANN = 6
LookalikeUTG_DEPRECATED = 7
ProducerBasedUserTweetGraph = 8
FrsUTG_DEPRECATED = 9
RealGraphOonUTG_DEPRECATED = 10
ConsumerEmbeddingBasedTwHINANN = 11
TwhinCollabFilter = 12
TwiceUTG_DEPRECATED = 13
ConsumerEmbeddingBasedTwoTowerANN = 14
TweetBasedBeTANN = 15
StpUTG_DEPRECATED = 16
UTEG = 17
ROMR = 18
ConsumersBasedUserTweetGraph = 19
TweetBasedUserVideoGraph = 20
CertoTopicTweet = 24
ConsumersBasedUserAdGraph = 25
TweetBasedUserAdGraph = 26
SkitTfgTopicTweet = 27
ConsumerBasedWalsANN = 28
ProducerBasedUserAdGraph = 29
SkitHighPrecisionTopicTweet = 30
SkitInterestBrowserTopicTweet = 31
SkitProducerBasedTopicTweet = 32
ExploreTripOfflineSimClustersTweets = 33
DiffusionBasedTweet = 34
ConsumersBasedUserVideoGraph = 35
// In network
EarlybirdRecencyBasedSimilarityEngine = 21
EarlybirdModelBasedSimilarityEngine = 22
EarlybirdTensorflowBasedSimilarityEngine = 23
// Composite
TweetBasedUnifiedSimilarityEngine = 1001
ProducerBasedUnifiedSimilarityEngine = 1002
} (persisted='true')

Binary file not shown.

View File

@ -1,28 +0,0 @@
namespace java com.twitter.cr_mixer.thriftjava
#@namespace scala com.twitter.cr_mixer.thriftscala
#@namespace strato com.twitter.cr_mixer
include "com/twitter/product_mixer/core/client_context.thrift"
include "product.thrift"
include "product_context.thrift"
include "source_type.thrift"
struct TopicTweetRequest {
1: required client_context.ClientContext clientContext
2: required product.Product product
3: required list<i64> topicIds
5: optional product_context.ProductContext productContext
6: optional list<i64> excludedTweetIds (personalDataType = 'TweetId')
} (persisted='true', hasPersonalData='true')
struct TopicTweet {
1: required i64 tweetId (personalDataType = 'TweetId')
2: required double score
3: required source_type.SimilarityEngineType similarityEngineType
} (persisted='true', hasPersonalData = 'true')
struct TopicTweetResponse {
1: required map<i64, list<TopicTweet>> tweets
} (persisted='true')

Binary file not shown.

View File

@ -1,31 +0,0 @@
namespace java com.twitter.cr_mixer.thriftjava
#@namespace scala com.twitter.cr_mixer.thriftscala
#@namespace strato com.twitter.cr_mixer
include "product.thrift"
include "product_context.thrift"
include "com/twitter/product_mixer/core/client_context.thrift"
include "com/twitter/recos/recos_common.thrift"
struct UtegTweetRequest {
1: required client_context.ClientContext clientContext
2: required product.Product product
# Product-specific parameters should be placed in the Product Context
3: optional product_context.ProductContext productContext
4: optional list<i64> excludedTweetIds (personalDataType = 'TweetId')
} (persisted='true', hasPersonalData='true')
struct UtegTweet {
// tweet id
1: required i64 tweetId(personalDataType = 'TweetId')
// sum of weights of seed users who engaged with the tweet.
// If a user engaged with the same tweet twice, liked it and retweeted it, then his/her weight was counted twice.
2: required double score
// user social proofs per engagement type
3: required map<recos_common.SocialProofType, list<i64>> socialProofByType(personalDataTypeKey='EngagementTypePrivate', personalDataTypeValue='UserId')
} (persisted='true', hasPersonalData = 'true')
struct UtegTweetResponse {
1: required list<UtegTweet> tweets
} (persisted='true')

Binary file not shown.

View File

@ -1,19 +0,0 @@
namespace java com.twitter.cr_mixer.thriftjava
#@namespace scala com.twitter.cr_mixer.thriftscala
#@namespace strato com.twitter.cr_mixer
// ValidationErrorCode is used to identify classes of client errors returned from a Product Mixer
// service. Use [[PipelineFailureExceptionMapper]] to adapt pipeline failures into thrift errors.
enum ValidationErrorCode {
PRODUCT_DISABLED = 1
PLACEHOLDER_2 = 2
} (hasPersonalData='false')
exception ValidationException {
1: ValidationErrorCode errorCode
2: string msg
} (hasPersonalData='false')
exception ValidationExceptionList {
1: list<ValidationException> errors
} (hasPersonalData='false')

Binary file not shown.

Before

Width:  |  Height:  |  Size: 94 KiB

View File

@ -1,48 +0,0 @@
# Without this alias, library :follow-recommendations-service_lib would conflict with :bin
alias(
name = "follow-recommendations-service",
target = ":follow-recommendations-service_lib",
)
target(
name = "follow-recommendations-service_lib",
dependencies = [
"follow-recommendations-service/server/src/main/scala/com/twitter/follow_recommendations",
"follow-recommendations-service/server/src/main/scala/com/twitter/follow_recommendations/models",
],
)
jvm_binary(
name = "bin",
basename = "follow-recommendations-service",
main = "com.twitter.follow_recommendations.FollowRecommendationsServiceThriftServerMain",
runtime_platform = "java11",
tags = ["bazel-compatible"],
dependencies = [
":follow-recommendations-service",
"3rdparty/jvm/ch/qos/logback:logback-classic",
"finagle/finagle-zipkin-scribe/src/main/scala",
"finatra/inject/inject-logback/src/main/scala",
"loglens/loglens-logback/src/main/scala/com/twitter/loglens/logback",
"twitter-server-internal/src/main/scala",
"twitter-server/logback-classic/src/main/scala",
],
)
# Aurora Workflows build phase convention requires a jvm_app named with ${project-name}-app
jvm_app(
name = "follow-recommendations-service-app",
archive = "zip",
binary = ":bin",
bundles = [
bundle(
fileset = [
"server/src/main/resources/*",
"server/src/main/resources/**/*",
],
owning_target = "follow-recommendations-service/server/src/main/resources:frs_resources",
relative_to = "server/src/main/resources",
),
],
tags = ["bazel-compatible"],
)

Binary file not shown.

Binary file not shown.

View File

@ -1,24 +0,0 @@
[code-coverage]
package = com.twitter.follow_recommendations
[docbird]
project_name = follow-recommendations-service
project_type = service
; example settings:
;
; project_name = fluffybird
; description = fluffybird is a service for fluffing up feathers.
; tags = python,documentation,fluffybird
; project_type = service
; - allowed options: essay, library, service, hub, cookbook, styleguide, policy
; owner_links = roster
; - allowed options: roster, find, email
; scrolling_tocs = yes
; comments = yes
; verifications = yes
; support_widget = yes
; health_score = yes
; sticky_sidebar = no
[jira]
project = CJREL

Binary file not shown.

Before

Width:  |  Height:  |  Size: 178 KiB

Binary file not shown.

View File

@ -1,40 +0,0 @@
# Follow Recommendations Service
## Introduction to the Follow Recommendations Service (FRS)
The Follow Recommendations Service (FRS) is a robust recommendation engine designed to provide users with personalized suggestions for accounts to follow. At present, FRS supports Who-To-Follow (WTF) module recommendations across a variety of Twitter product interfaces. Additionally, by suggesting tweet authors, FRS also delivers FutureGraph tweet recommendations, which consist of tweets from accounts that users may be interested in following in the future.
## Design
The system is tailored to accommodate diverse use cases, such as Post New-User-Experience (NUX), advertisements, FutureGraph tweets, and more. Each use case features a unique display location identifier. To view all display locations, refer to the following path: `follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/models/DisplayLocation.scala`.
Recommendation steps are customized according to each display location. Common and high-level steps are encapsulated within the "RecommendationFlow," which includes operations like candidate generation, ranker selection, filtering, transformation, and beyond. To explore all flows, refer to this path: `follow-recommendations-service/server/src/main/scala/com/twitter/follow_recommendations/flows`.
For each product (corresponding to a display location), one or multiple flows can be selected to generate candidates based on code and configurations. To view all products, refer to the following path: `follow-recommendations-service/server/src/main/scala/com/twitter/follow_recommendations/products/home_timeline_tweet_recs`.
The FRS overview diagram is depicted below:
![FRS_architecture.png](FRS_architecture.png)
### Candidate Generation
During this step, FRS utilizes various user signals and algorithms to identify candidates from all Twitter accounts. The candidate source folder is located at `follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/candidate_sources/`, with a README file provided within each candidate source folder.
### Filtering
In this phase, FRS applies different filtering logic after generating account candidates to improve quality and health. Filtering may occur before and/or after the ranking step, with heavier filtering logic (e.g., higher latency) typically applied after the ranking step. The filters' folder is located at `follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/predicates`.
### Ranking
During this step, FRS employs both Machine Learning (ML) and heuristic rule-based candidate ranking. For the ML ranker, ML features are fetched beforehand (i.e., feature hydration),
and a DataRecord (the Twitter-standard Machine Learning data format used to represent feature data, labels, and predictions when training or serving) is constructed for each <user, candidate> pair.
These pairs are then sent to a separate ML prediction service, which houses the ML model trained offline.
The ML prediction service returns a prediction score, representing the probability that a user will follow and engage with the candidate.
This score is a weighted sum of p(follow|recommendation) and p(positive engagement|follow), and FRS uses this score to rank the candidates.
The rankers' folder is located at `follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/rankers`.
### Transform
In this phase, the sequence of candidates undergoes necessary transformations, such as deduplication, attaching social proof (i.e., "followed by XX user"), adding tracking tokens, and more.
The transformers' folder can be found at `follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/transforms`.
### Truncation
During this final step, FRS trims the candidate pool to a specified size. This process ensures that only the most relevant and engaging candidates are presented to users while maintaining an optimal user experience.
By implementing these comprehensive steps and adapting to various use cases, the Follow Recommendations Service (FRS) effectively curates tailored suggestions for Twitter users, enhancing their overall experience and promoting meaningful connections within the platform.

View File

@ -1,18 +0,0 @@
scala_library(
compiler_option_sets = ["fatal_warnings"],
platform = "java8",
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/com/google/guava",
"configapi/configapi-core/src/main/scala/com/twitter/timelines/configapi",
"finagle/finagle-core/src/main",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/models",
"hermit/hermit-core/src/main/scala/com/twitter/hermit/model",
"product-mixer/core/src/main/scala/com/twitter/product_mixer/core/functional_component/candidate_source",
"product-mixer/core/src/main/scala/com/twitter/product_mixer/core/pipeline/recommendation",
"stitch/stitch-core",
],
exports = [
"product-mixer/core/src/main/scala/com/twitter/product_mixer/core/pipeline/recommendation",
],
)

View File

@ -1,36 +0,0 @@
package com.twitter.follow_recommendations.common.base
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.follow_recommendations.common.base.EnrichedCandidateSource.toEnriched
import com.twitter.product_mixer.core.functional_component.candidate_source.CandidateSource
import com.twitter.product_mixer.core.model.common.identifier.CandidateSourceIdentifier
// a helper structure to register and select candidate sources based on identifiers
trait CandidateSourceRegistry[Target, Candidate] {
val statsReceiver: StatsReceiver
def sources: Set[CandidateSource[Target, Candidate]]
final lazy val candidateSources: Map[
CandidateSourceIdentifier,
CandidateSource[Target, Candidate]
] = {
val map = sources.map { c =>
c.identifier -> c.observe(statsReceiver)
}.toMap
if (map.size != sources.size) {
throw new IllegalArgumentException("Duplicate Candidate Source Identifiers")
}
map
}
def select(
identifiers: Set[CandidateSourceIdentifier]
): Set[CandidateSource[Target, Candidate]] = {
// fails loud if the candidate source is not registered
identifiers.map(candidateSources(_))
}
}

View File

@ -1,164 +0,0 @@
package com.twitter.follow_recommendations.common.base
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.product_mixer.core.functional_component.candidate_source.CandidateSource
import com.twitter.stitch.Stitch
import com.twitter.util.Duration
import com.twitter.util.TimeoutException
import scala.language.implicitConversions
class EnrichedCandidateSource[Target, Candidate](original: CandidateSource[Target, Candidate]) {
/**
* Gate the candidate source based on the Predicate of target.
* It returns results only if the predicate returns Valid.
*
* @param predicate
* @return
*/
def gate(predicate: Predicate[Target]): CandidateSource[Target, Candidate] = {
throw new UnsupportedOperationException()
}
def observe(statsReceiver: StatsReceiver): CandidateSource[Target, Candidate] = {
val originalIdentifier = original.identifier
val stats = statsReceiver.scope(originalIdentifier.name)
new CandidateSource[Target, Candidate] {
val identifier = originalIdentifier
override def apply(target: Target): Stitch[Seq[Candidate]] = {
StatsUtil.profileStitchSeqResults[Candidate](original(target), stats)
}
}
}
/**
* Map target type into new target type (1 to optional mapping)
*/
def stitchMapKey[Target2](
targetMapper: Target2 => Stitch[Option[Target]]
): CandidateSource[Target2, Candidate] = {
val targetsMapper: Target2 => Stitch[Seq[Target]] = { target =>
targetMapper(target).map(_.toSeq)
}
stitchMapKeys(targetsMapper)
}
/**
* Map target type into new target type (1 to many mapping)
*/
def stitchMapKeys[Target2](
targetMapper: Target2 => Stitch[Seq[Target]]
): CandidateSource[Target2, Candidate] = {
new CandidateSource[Target2, Candidate] {
val identifier = original.identifier
override def apply(target: Target2): Stitch[Seq[Candidate]] = {
for {
mappedTargets <- targetMapper(target)
results <- Stitch.traverse(mappedTargets)(original(_))
} yield results.flatten
}
}
}
/**
* Map target type into new target type (1 to many mapping)
*/
def mapKeys[Target2](
targetMapper: Target2 => Seq[Target]
): CandidateSource[Target2, Candidate] = {
val stitchMapper: Target2 => Stitch[Seq[Target]] = { target =>
Stitch.value(targetMapper(target))
}
stitchMapKeys(stitchMapper)
}
/**
* Map candidate types to new type based on candidateMapper
*/
def mapValues[Candidate2](
candidateMapper: Candidate => Stitch[Option[Candidate2]]
): CandidateSource[Target, Candidate2] = {
new CandidateSource[Target, Candidate2] {
val identifier = original.identifier
override def apply(target: Target): Stitch[Seq[Candidate2]] = {
original(target).flatMap { candidates =>
val results = Stitch.traverse(candidates)(candidateMapper(_))
results.map(_.flatten)
}
}
}
}
/**
* Map candidate types to new type based on candidateMapper
*/
def mapValue[Candidate2](
candidateMapper: Candidate => Candidate2
): CandidateSource[Target, Candidate2] = {
val stitchMapper: Candidate => Stitch[Option[Candidate2]] = { c =>
Stitch.value(Some(candidateMapper(c)))
}
mapValues(stitchMapper)
}
/**
* This method wraps the candidate source in a designated timeout so that a single candidate
* source does not result in a timeout for the entire flow
*/
def within(
candidateTimeout: Duration,
statsReceiver: StatsReceiver
): CandidateSource[Target, Candidate] = {
val originalIdentifier = original.identifier
val timeoutCounter =
statsReceiver.counter(originalIdentifier.name, "timeout")
new CandidateSource[Target, Candidate] {
val identifier = originalIdentifier
override def apply(target: Target): Stitch[Seq[Candidate]] = {
original
.apply(target)
.within(candidateTimeout)(com.twitter.finagle.util.DefaultTimer)
.rescue {
case _: TimeoutException =>
timeoutCounter.incr()
Stitch.Nil
}
}
}
}
def failOpenWithin(
candidateTimeout: Duration,
statsReceiver: StatsReceiver
): CandidateSource[Target, Candidate] = {
val originalIdentifier = original.identifier
val timeoutCounter =
statsReceiver.counter(originalIdentifier.name, "timeout")
new CandidateSource[Target, Candidate] {
val identifier = originalIdentifier
override def apply(target: Target): Stitch[Seq[Candidate]] = {
original
.apply(target)
.within(candidateTimeout)(com.twitter.finagle.util.DefaultTimer)
.handle {
case _: TimeoutException =>
timeoutCounter.incr()
Seq.empty
case e: Exception =>
statsReceiver
.scope("candidate_source_error").scope(originalIdentifier.name).counter(
e.getClass.getSimpleName).incr
Seq.empty
}
}
}
}
}
object EnrichedCandidateSource {
implicit def toEnriched[K, V](original: CandidateSource[K, V]): EnrichedCandidateSource[K, V] =
new EnrichedCandidateSource(original)
}

View File

@ -1,17 +0,0 @@
package com.twitter.follow_recommendations.common.base
import com.twitter.follow_recommendations.common.models.FilterReason.ParamReason
import com.twitter.stitch.Stitch
import com.twitter.timelines.configapi.HasParams
import com.twitter.timelines.configapi.Param
case class ParamPredicate[Request <: HasParams](param: Param[Boolean]) extends Predicate[Request] {
def apply(request: Request): Stitch[PredicateResult] = {
if (request.params(param)) {
Stitch.value(PredicateResult.Valid)
} else {
Stitch.value(PredicateResult.Invalid(Set(ParamReason(param.statName))))
}
}
}

View File

@ -1,282 +0,0 @@
package com.twitter.follow_recommendations.common.base
import com.twitter.finagle.stats.NullStatsReceiver
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.follow_recommendations.common.models.FilterReason
import com.twitter.stitch.Arrow
import com.twitter.stitch.Stitch
trait Predicate[-Q] {
def apply(item: Q): Stitch[PredicateResult]
def arrow: Arrow[Q, PredicateResult] = Arrow.apply(apply)
def map[K](mapper: K => Q): Predicate[K] = Predicate(arrow.contramap(mapper))
/**
* check the predicate results for a batch of items for convenience.
*
* mark it as final to avoid potential abuse usage
*/
final def batch(items: Seq[Q]): Stitch[Seq[PredicateResult]] = {
this.arrow.traverse(items)
}
/**
* Syntax sugar for functions which take in 2 inputs as a tuple.
*/
def apply[Q1, Q2](item1: Q1, item2: Q2)(implicit ev: ((Q1, Q2)) => Q): Stitch[PredicateResult] = {
apply((item1, item2))
}
/**
* Runs the predicates in sequence. The returned predicate will return true iff both the predicates return true.
* ie. it is an AND operation
*
* We short-circuit the evaluation, ie we don't evaluate the 2nd predicate if the 1st is false
*
* @param p predicate to run in sequence
*
* @return a new predicate object that represents the logical AND of both predicates
*/
def andThen[Q1 <: Q](p: Predicate[Q1]): Predicate[Q1] = {
Predicate({ query: Q1 =>
apply(query).flatMap {
case PredicateResult.Valid => p(query)
case PredicateResult.Invalid(reasons) => Stitch.value(PredicateResult.Invalid(reasons))
}
})
}
/**
* Creates a predicate which runs the current & given predicate in sequence.
* The returned predicate will return true if either current or given predicate returns true.
* That is, given predicate will be only run if current predicate returns false.
*
* @param p predicate to run in sequence
*
* @return new predicate object that represents the logical OR of both predicates.
* if both are invalid, the reason would be the set of all invalid reasons.
*/
def or[Q1 <: Q](p: Predicate[Q1]): Predicate[Q1] = {
Predicate({ query: Q1 =>
apply(query).flatMap {
case PredicateResult.Valid => Stitch.value(PredicateResult.Valid)
case PredicateResult.Invalid(reasons) =>
p(query).flatMap {
case PredicateResult.Valid => Stitch.value(PredicateResult.Valid)
case PredicateResult.Invalid(newReasons) =>
Stitch.value(PredicateResult.Invalid(reasons ++ newReasons))
}
}
})
}
/*
* Runs the predicate only if the provided predicate is valid, otherwise returns valid.
* */
def gate[Q1 <: Q](gatingPredicate: Predicate[Q1]): Predicate[Q1] = {
Predicate { query: Q1 =>
gatingPredicate(query).flatMap { result =>
if (result == PredicateResult.Valid) {
apply(query)
} else {
Stitch.value(PredicateResult.Valid)
}
}
}
}
def observe(statsReceiver: StatsReceiver): Predicate[Q] = Predicate(
StatsUtil.profilePredicateResult(this.arrow, statsReceiver))
def convertToFailOpenWithResultType(resultType: PredicateResult): Predicate[Q] = {
Predicate { query: Q =>
apply(query).handle {
case _: Exception =>
resultType
}
}
}
}
class TruePredicate[Q] extends Predicate[Q] {
override def apply(item: Q): Stitch[PredicateResult] = Predicate.AlwaysTrueStitch
}
class FalsePredicate[Q](reason: FilterReason) extends Predicate[Q] {
val InvalidResult = Stitch.value(PredicateResult.Invalid(Set(reason)))
override def apply(item: Q): Stitch[PredicateResult] = InvalidResult
}
object Predicate {
val AlwaysTrueStitch = Stitch.value(PredicateResult.Valid)
val NumBatchesStat = "num_batches_stats"
val NumBatchesCount = "num_batches"
def apply[Q](func: Q => Stitch[PredicateResult]): Predicate[Q] = new Predicate[Q] {
override def apply(item: Q): Stitch[PredicateResult] = func(item)
override val arrow: Arrow[Q, PredicateResult] = Arrow(func)
}
def apply[Q](outerArrow: Arrow[Q, PredicateResult]): Predicate[Q] = new Predicate[Q] {
override def apply(item: Q): Stitch[PredicateResult] = arrow(item)
override val arrow: Arrow[Q, PredicateResult] = outerArrow
}
/**
* Given some items, this function
* 1. chunks them up in groups
* 2. lazily applies a predicate on each group
* 3. filters based on the predicate
* 4. takes first numToTake items.
*
* If numToTake is satisfied, then any later predicates are not called.
*
* @param items items of type Q
* @param predicate predicate that determines whether an item is acceptable
* @param batchSize batch size to call the predicate with
* @param numToTake max number of items to return
* @param stats stats receiver
* @tparam Q type of item
*
* @return a future of K items
*/
def batchFilterTake[Q](
items: Seq[Q],
predicate: Predicate[Q],
batchSize: Int,
numToTake: Int,
stats: StatsReceiver
): Stitch[Seq[Q]] = {
def take(
input: Iterator[Stitch[Seq[Q]]],
prev: Seq[Q],
takeSize: Int,
numOfBatch: Int
): Stitch[(Seq[Q], Int)] = {
if (input.hasNext) {
val currFut = input.next()
currFut.flatMap { curr =>
val taken = curr.take(takeSize)
val combined = prev ++ taken
if (taken.size < takeSize)
take(input, combined, takeSize - taken.size, numOfBatch + 1)
else Stitch.value((combined, numOfBatch + 1))
}
} else {
Stitch.value((prev, numOfBatch))
}
}
val batchedItems = items.view.grouped(batchSize)
val batchedFutures = batchedItems.map { batch =>
Stitch.traverse(batch)(predicate.apply).map { conds =>
(batch.zip(conds)).withFilter(_._2.value).map(_._1)
}
}
take(batchedFutures, Nil, numToTake, 0).map {
case (filtered: Seq[Q], numOfBatch: Int) =>
stats.stat(NumBatchesStat).add(numOfBatch)
stats.counter(NumBatchesCount).incr(numOfBatch)
filtered
}
}
/**
* filter a list of items based on the predicate
*
* @param items a list of items
* @param predicate predicate of the item
* @tparam Q item type
* @return the list of items that satisfy the predicate
*/
def filter[Q](items: Seq[Q], predicate: Predicate[Q]): Stitch[Seq[Q]] = {
predicate.batch(items).map { results =>
items.zip(results).collect {
case (item, PredicateResult.Valid) => item
}
}
}
/**
* filter a list of items based on the predicate given the target
*
* @param target target item
* @param items a list of items
* @param predicate predicate of the (target, item) pair
* @tparam Q item type
* @return the list of items that satisfy the predicate given the target
*/
def filter[T, Q](target: T, items: Seq[Q], predicate: Predicate[(T, Q)]): Stitch[Seq[Q]] = {
predicate.batch(items.map(i => (target, i))).map { results =>
items.zip(results).collect {
case (item, PredicateResult.Valid) => item
}
}
}
/**
* Returns a predicate, where an element is true iff it that element is true for all input predicates.
* ie. it is an AND operation
*
* This is done concurrently.
*
* @param predicates list of predicates
* @tparam Q Type parameter
*
* @return new predicate object that is the logical "and" of the input predicates
*/
def andConcurrently[Q](predicates: Seq[Predicate[Q]]): Predicate[Q] = {
Predicate { query: Q =>
Stitch.traverse(predicates)(p => p(query)).map { predicateResults =>
val allInvalid = predicateResults
.collect {
case PredicateResult.Invalid(reason) =>
reason
}
if (allInvalid.isEmpty) {
PredicateResult.Valid
} else {
val allInvalidReasons = allInvalid.reduce(_ ++ _)
PredicateResult.Invalid(allInvalidReasons)
}
}
}
}
}
/**
* applies the underlying predicate when the param is on.
*/
abstract class GatedPredicateBase[Q](
underlyingPredicate: Predicate[Q],
stats: StatsReceiver = NullStatsReceiver)
extends Predicate[Q] {
def gate(item: Q): Boolean
val underlyingPredicateTotal = stats.counter("underlying_total")
val underlyingPredicateValid = stats.counter("underlying_valid")
val underlyingPredicateInvalid = stats.counter("underlying_invalid")
val notGatedCounter = stats.counter("not_gated")
val ValidStitch: Stitch[PredicateResult.Valid.type] = Stitch.value(PredicateResult.Valid)
override def apply(item: Q): Stitch[PredicateResult] = {
if (gate(item)) {
underlyingPredicateTotal.incr()
underlyingPredicate(item)
} else {
notGatedCounter.incr()
ValidStitch
}
}
}

View File

@ -1,18 +0,0 @@
package com.twitter.follow_recommendations.common.base
import com.twitter.follow_recommendations.common.models.FilterReason
sealed trait PredicateResult {
def value: Boolean
}
object PredicateResult {
case object Valid extends PredicateResult {
override val value = true
}
case class Invalid(reasons: Set[FilterReason] = Set.empty[FilterReason]) extends PredicateResult {
override val value = false
}
}

View File

@ -1,90 +0,0 @@
package com.twitter.follow_recommendations.common.base
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.stitch.Stitch
import com.twitter.util.Duration
import com.twitter.util.TimeoutException
/**
* Ranker is a special kind of transform that would only change the order of a list of items.
* If a single item is given, it "may" attach additional scoring information to the item.
*
* @tparam Target target to recommend the candidates
* @tparam Candidate candidate type to rank
*/
trait Ranker[Target, Candidate] extends Transform[Target, Candidate] { ranker =>
def rank(target: Target, candidates: Seq[Candidate]): Stitch[Seq[Candidate]]
override def transform(target: Target, candidates: Seq[Candidate]): Stitch[Seq[Candidate]] = {
rank(target, candidates)
}
override def observe(statsReceiver: StatsReceiver): Ranker[Target, Candidate] = {
val originalRanker = this
new Ranker[Target, Candidate] {
override def rank(target: Target, items: Seq[Candidate]): Stitch[Seq[Candidate]] = {
statsReceiver.counter(Transform.InputCandidatesCount).incr(items.size)
statsReceiver.stat(Transform.InputCandidatesStat).add(items.size)
StatsUtil.profileStitchSeqResults(originalRanker.rank(target, items), statsReceiver)
}
}
}
def reverse: Ranker[Target, Candidate] = new Ranker[Target, Candidate] {
def rank(target: Target, candidates: Seq[Candidate]): Stitch[Seq[Candidate]] =
ranker.rank(target, candidates).map(_.reverse)
}
def andThen(other: Ranker[Target, Candidate]): Ranker[Target, Candidate] = {
val original = this
new Ranker[Target, Candidate] {
def rank(target: Target, candidates: Seq[Candidate]): Stitch[Seq[Candidate]] = {
original.rank(target, candidates).flatMap { results => other.rank(target, results) }
}
}
}
/**
* This method wraps the Ranker in a designated timeout.
* If the ranker timeouts, it would return the original candidates directly,
* instead of failing the whole recommendation flow
*/
def within(timeout: Duration, statsReceiver: StatsReceiver): Ranker[Target, Candidate] = {
val timeoutCounter = statsReceiver.counter("timeout")
val original = this
new Ranker[Target, Candidate] {
override def rank(target: Target, candidates: Seq[Candidate]): Stitch[Seq[Candidate]] = {
original
.rank(target, candidates)
.within(timeout)(com.twitter.finagle.util.DefaultTimer)
.rescue {
case _: TimeoutException =>
timeoutCounter.incr()
Stitch.value(candidates)
}
}
}
}
}
object Ranker {
def chain[Target, Candidate](
transformer: Transform[Target, Candidate],
ranker: Ranker[Target, Candidate]
): Ranker[Target, Candidate] = {
new Ranker[Target, Candidate] {
def rank(target: Target, candidates: Seq[Candidate]): Stitch[Seq[Candidate]] = {
transformer
.transform(target, candidates)
.flatMap { results => ranker.rank(target, results) }
}
}
}
}
class IdentityRanker[Target, Candidate] extends Ranker[Target, Candidate] {
def rank(target: Target, candidates: Seq[Candidate]): Stitch[Seq[Candidate]] =
Stitch.value(candidates)
}

View File

@ -1,250 +0,0 @@
package com.twitter.follow_recommendations.common.base
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.product_mixer.core.functional_component.candidate_source.CandidateSource
import com.twitter.product_mixer.core.model.common.UniversalNoun
import com.twitter.product_mixer.core.model.common.identifier.RecommendationPipelineIdentifier
import com.twitter.product_mixer.core.pipeline.recommendation.RecommendationPipelineResult
import com.twitter.product_mixer.core.quality_factor.QualityFactorObserver
import com.twitter.stitch.Stitch
/**
* configs for results generated from the recommendation flow
*
* @param desiredCandidateCount num of desired candidates to return
* @param batchForCandidatesCheck batch size for candidates check
*/
case class RecommendationResultsConfig(desiredCandidateCount: Int, batchForCandidatesCheck: Int)
trait BaseRecommendationFlow[Target, Candidate <: UniversalNoun[Long]] {
val identifier = RecommendationPipelineIdentifier("RecommendationFlow")
def process(
pipelineRequest: Target
): Stitch[RecommendationPipelineResult[Candidate, Seq[Candidate]]]
def mapKey[Target2](fn: Target2 => Target): BaseRecommendationFlow[Target2, Candidate] = {
val original = this
new BaseRecommendationFlow[Target2, Candidate] {
override def process(
pipelineRequest: Target2
): Stitch[RecommendationPipelineResult[Candidate, Seq[Candidate]]] =
original.process(fn(pipelineRequest))
}
}
}
/**
* Defines a typical recommendation flow to fetch, filter, rank and transform candidates.
*
* 1. targetEligibility: determine the eligibility of target request
* 2. candidateSources: fetch candidates from candidate sources based on target type
* 3. preRankerCandidateFilter: light filtering of candidates
* 4. ranker: ranking of candidates (could be composed of multiple stages, light ranking, heavy ranking and etc)
* 5. postRankerTransform: deduping, grouping, rule based promotion / demotions and etc
* 6. validateCandidates: heavy filters to determine the eligibility of the candidates.
* will only be applied to candidates that we expect to return.
* 7. transformResults: transform the individual candidates into desired format (e.g. hydrate social proof)
*
* Note that the actual implementations may not need to implement all the steps if not needed
* (could just leave to IdentityRanker if ranking is not needed).
*
* Theoretically, the actual implementation could override the above flow to add
* more steps (e.g. add a transform step before ranking).
* But it is recommended to add the additional steps into this base flow if the step proves
* to have significant justification, or merge it into an existing step if it is a minor change.
*
* @tparam Target type of target request
* @tparam Candidate type of candidate to return
*/
trait RecommendationFlow[Target, Candidate <: UniversalNoun[Long]]
extends BaseRecommendationFlow[Target, Candidate]
with SideEffectsUtil[Target, Candidate] {
/**
* optionally update or enrich the request before executing the flows
*/
protected def updateTarget(target: Target): Stitch[Target] = Stitch.value(target)
/**
* check if the target is eligible for the flow
*/
protected def targetEligibility: Predicate[Target]
/**
* define the candidate sources that should be used for the given target
*/
protected def candidateSources(target: Target): Seq[CandidateSource[Target, Candidate]]
/**
* filter invalid candidates before the ranking phase.
*/
protected def preRankerCandidateFilter: Predicate[(Target, Candidate)]
/**
* rank the candidates
*/
protected def selectRanker(target: Target): Ranker[Target, Candidate]
/**
* transform the candidates after ranking (e.g. dedupping, grouping and etc)
*/
protected def postRankerTransform: Transform[Target, Candidate]
/**
* filter invalid candidates before returning the results.
*
* Some heavy filters e.g. SGS filter could be applied in this step
*/
protected def validateCandidates: Predicate[(Target, Candidate)]
/**
* transform the candidates into results and return
*/
protected def transformResults: Transform[Target, Candidate]
/**
* configuration for recommendation results
*/
protected def resultsConfig(target: Target): RecommendationResultsConfig
/**
* track the quality factor the recommendation pipeline
*/
protected def qualityFactorObserver: Option[QualityFactorObserver] = None
def statsReceiver: StatsReceiver
/**
* high level monitoring for the whole flow
* (make sure to add monitoring for each individual component by yourself)
*
* additional candidates: count, stats, non_empty_count
* target eligibility: latency, success, failures, request, count, valid_count, invalid_count, invalid_reasons
* candidate generation: latency, success, failures, request, count, non_empty_count, results_stat
* pre ranker filter: latency, success, failures, request, count, non_empty_count, results_stat
* ranker: latency, success, failures, request, count, non_empty_count, results_stat
* post ranker: latency, success, failures, request, count, non_empty_count, results_stat
* filter and take: latency, success, failures, request, count, non_empty_count, results_stat, batch count
* transform results: latency, success, failures, request, count, non_empty_count, results_stat
*/
import RecommendationFlow._
lazy val additionalCandidatesStats = statsReceiver.scope(AdditionalCandidatesStats)
lazy val targetEligibilityStats = statsReceiver.scope(TargetEligibilityStats)
lazy val candidateGenerationStats = statsReceiver.scope(CandidateGenerationStats)
lazy val preRankerFilterStats = statsReceiver.scope(PreRankerFilterStats)
lazy val rankerStats = statsReceiver.scope(RankerStats)
lazy val postRankerTransformStats = statsReceiver.scope(PostRankerTransformStats)
lazy val filterAndTakeStats = statsReceiver.scope(FilterAndTakeStats)
lazy val transformResultsStats = statsReceiver.scope(TransformResultsStats)
lazy val overallStats = statsReceiver.scope(OverallStats)
import StatsUtil._
override def process(
pipelineRequest: Target
): Stitch[RecommendationPipelineResult[Candidate, Seq[Candidate]]] = {
observeStitchQualityFactor(
profileStitchSeqResults(
updateTarget(pipelineRequest).flatMap { target =>
profilePredicateResult(targetEligibility(target), targetEligibilityStats).flatMap {
case PredicateResult.Valid => processValidTarget(target, Seq.empty)
case PredicateResult.Invalid(_) => Stitch.Nil
}
},
overallStats
).map { candidates =>
RecommendationPipelineResult.empty.withResult(candidates)
},
qualityFactorObserver,
overallStats
)
}
protected def processValidTarget(
target: Target,
additionalCandidates: Seq[Candidate]
): Stitch[Seq[Candidate]] = {
/**
* A basic recommendation flow looks like this:
*
* 1. fetch candidates from candidate sources
* 2. blend candidates with existing candidates
* 3. filter the candidates (light filters) before ranking
* 4. ranking
* 5. filter and truncate the candidates using postRankerCandidateFilter
* 6. transform the candidates based on product requirement
*/
val candidateSourcesToFetch = candidateSources(target)
for {
candidates <- profileStitchSeqResults(
Stitch.traverse(candidateSourcesToFetch)(_(target)).map(_.flatten),
candidateGenerationStats
)
mergedCandidates =
profileSeqResults(additionalCandidates, additionalCandidatesStats) ++
candidates
filteredCandidates <- profileStitchSeqResults(
Predicate.filter(target, mergedCandidates, preRankerCandidateFilter),
preRankerFilterStats
)
rankedCandidates <- profileStitchSeqResults(
selectRanker(target).rank(target, filteredCandidates),
rankerStats
)
transformed <- profileStitchSeqResults(
postRankerTransform.transform(target, rankedCandidates),
postRankerTransformStats
)
truncated <- profileStitchSeqResults(
take(target, transformed, resultsConfig(target)),
filterAndTakeStats
)
results <- profileStitchSeqResults(
transformResults.transform(target, truncated),
transformResultsStats
)
_ <- applySideEffects(
target,
candidateSourcesToFetch,
candidates,
mergedCandidates,
filteredCandidates,
rankedCandidates,
transformed,
truncated,
results)
} yield results
}
private[this] def take(
target: Target,
candidates: Seq[Candidate],
config: RecommendationResultsConfig
): Stitch[Seq[Candidate]] = {
Predicate
.batchFilterTake(
candidates.map(c => (target, c)),
validateCandidates,
config.batchForCandidatesCheck,
config.desiredCandidateCount,
statsReceiver
).map(_.map(_._2))
}
}
object RecommendationFlow {
val AdditionalCandidatesStats = "additional_candidates"
val TargetEligibilityStats = "target_eligibility"
val CandidateGenerationStats = "candidate_generation"
val PreRankerFilterStats = "pre_ranker_filter"
val RankerStats = "ranker"
val PostRankerTransformStats = "post_ranker_transform"
val FilterAndTakeStats = "filter_and_take"
val TransformResultsStats = "transform_results"
val OverallStats = "overall"
}

View File

@ -1,24 +0,0 @@
package com.twitter.follow_recommendations.common.base
import com.twitter.product_mixer.core.functional_component.candidate_source.CandidateSource
import com.twitter.stitch.Stitch
/**
* SideEffectsUtil applies side effects to the intermediate candidate results from a recommendation flow pipeline.
*
* @tparam Target target to recommend the candidates
* @tparam Candidate candidate type to rank
*/
trait SideEffectsUtil[Target, Candidate] {
def applySideEffects(
target: Target,
candidateSources: Seq[CandidateSource[Target, Candidate]],
candidatesFromCandidateSources: Seq[Candidate],
mergedCandidates: Seq[Candidate],
filteredCandidates: Seq[Candidate],
rankedCandidates: Seq[Candidate],
transformedCandidates: Seq[Candidate],
truncatedCandidates: Seq[Candidate],
results: Seq[Candidate]
): Stitch[Unit] = Stitch.Unit
}

View File

@ -1,272 +0,0 @@
package com.twitter.follow_recommendations.common.base
import com.twitter.finagle.stats.Stat
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.product_mixer.core.quality_factor.QualityFactorObserver
import com.twitter.stitch.Arrow
import com.twitter.stitch.Stitch
import com.twitter.util.Stopwatch
import java.util.concurrent.TimeUnit
import scala.util.control.NonFatal
object StatsUtil {
val LatencyName = "latency_ms"
val RequestName = "requests"
val SuccessName = "success"
val FailureName = "failures"
val ResultsName = "results"
val ResultsStat = "results_stat"
val EmptyResultsName = "empty"
val NonEmptyResultsName = "non_empty"
val ValidCount = "valid"
val InvalidCount = "invalid"
val InvalidHasReasons = "has_reasons"
val Reasons = "reasons"
val QualityFactorStat = "quality_factor_stat"
val QualityFactorCounts = "quality_factor_counts"
/**
* Helper function for timing a stitch, returning the original stitch.
*/
def profileStitch[T](stitch: Stitch[T], stat: StatsReceiver): Stitch[T] = {
Stitch
.time(stitch)
.map {
case (response, stitchRunDuration) =>
stat.counter(RequestName).incr()
stat.stat(LatencyName).add(stitchRunDuration.inMilliseconds)
response
.onSuccess { _ => stat.counter(SuccessName).incr() }
.onFailure { e =>
stat.counter(FailureName).incr()
stat.scope(FailureName).counter(getCleanClassName(e)).incr()
}
}
.lowerFromTry
}
/**
* Helper function for timing an arrow, returning the original arrow.
*/
def profileArrow[T, U](arrow: Arrow[T, U], stat: StatsReceiver): Arrow[T, U] = {
Arrow
.time(arrow)
.map {
case (response, stitchRunDuration) =>
stat.counter(RequestName).incr()
stat.stat(LatencyName).add(stitchRunDuration.inMilliseconds)
response
.onSuccess { _ => stat.counter(SuccessName).incr() }
.onFailure { e =>
stat.counter(FailureName).incr()
stat.scope(FailureName).counter(getCleanClassName(e)).incr()
}
}
.lowerFromTry
}
/**
* Helper function to count and track the distribution of results
*/
def profileResults[T](results: T, stat: StatsReceiver, size: T => Int): T = {
val numResults = size(results)
stat.counter(ResultsName).incr(numResults)
if (numResults == 0) {
stat.counter(EmptyResultsName).incr()
results
} else {
stat.stat(ResultsStat).add(numResults)
stat.counter(NonEmptyResultsName).incr()
results
}
}
/**
* Helper function to count and track the distribution of a list of results
*/
def profileSeqResults[T](results: Seq[T], stat: StatsReceiver): Seq[T] = {
profileResults[Seq[T]](results, stat, _.size)
}
/**
* Helper function for timing a stitch and count the number of results, returning the original stitch.
*/
def profileStitchResults[T](stitch: Stitch[T], stat: StatsReceiver, size: T => Int): Stitch[T] = {
profileStitch(stitch, stat).onSuccess { results => profileResults(results, stat, size) }
}
/**
* Helper function for timing an arrow and count the number of results, returning the original arrow.
*/
def profileArrowResults[T, U](
arrow: Arrow[T, U],
stat: StatsReceiver,
size: U => Int
): Arrow[T, U] = {
profileArrow(arrow, stat).onSuccess { results => profileResults(results, stat, size) }
}
/**
* Helper function for timing a stitch and count a seq of results, returning the original stitch.
*/
def profileStitchSeqResults[T](stitch: Stitch[Seq[T]], stat: StatsReceiver): Stitch[Seq[T]] = {
profileStitchResults[Seq[T]](stitch, stat, _.size)
}
/**
* Helper function for timing a stitch and count optional results, returning the original stitch.
*/
def profileStitchOptionalResults[T](
stitch: Stitch[Option[T]],
stat: StatsReceiver
): Stitch[Option[T]] = {
profileStitchResults[Option[T]](stitch, stat, _.size)
}
/**
* Helper function for timing a stitch and count a map of results, returning the original stitch.
*/
def profileStitchMapResults[K, V](
stitch: Stitch[Map[K, V]],
stat: StatsReceiver
): Stitch[Map[K, V]] = {
profileStitchResults[Map[K, V]](stitch, stat, _.size)
}
def getCleanClassName(obj: Object): String =
obj.getClass.getSimpleName.stripSuffix("$")
/**
* Helper function for timing a stitch and count a list of PredicateResult
*/
def profilePredicateResults(
predicateResult: Stitch[Seq[PredicateResult]],
statsReceiver: StatsReceiver
): Stitch[Seq[PredicateResult]] = {
profileStitch[Seq[PredicateResult]](
predicateResult,
statsReceiver
).onSuccess {
_.map {
case PredicateResult.Valid =>
statsReceiver.counter(ValidCount).incr()
case PredicateResult.Invalid(reasons) =>
statsReceiver.counter(InvalidCount).incr()
reasons.map { filterReason =>
statsReceiver.counter(InvalidHasReasons).incr()
statsReceiver.scope(Reasons).counter(filterReason.reason).incr()
}
}
}
}
/**
* Helper function for timing a stitch and count individual PredicateResult
*/
def profilePredicateResult(
predicateResult: Stitch[PredicateResult],
statsReceiver: StatsReceiver
): Stitch[PredicateResult] = {
profilePredicateResults(
predicateResult.map(Seq(_)),
statsReceiver
).map(_.head)
}
/**
* Helper function for timing an arrow and count a list of PredicateResult
*/
def profilePredicateResults[Q](
predicateResult: Arrow[Q, Seq[PredicateResult]],
statsReceiver: StatsReceiver
): Arrow[Q, Seq[PredicateResult]] = {
profileArrow[Q, Seq[PredicateResult]](
predicateResult,
statsReceiver
).onSuccess {
_.map {
case PredicateResult.Valid =>
statsReceiver.counter(ValidCount).incr()
case PredicateResult.Invalid(reasons) =>
statsReceiver.counter(InvalidCount).incr()
reasons.map { filterReason =>
statsReceiver.counter(InvalidHasReasons).incr()
statsReceiver.scope(Reasons).counter(filterReason.reason).incr()
}
}
}
}
/**
* Helper function for timing an arrow and count individual PredicateResult
*/
def profilePredicateResult[Q](
predicateResult: Arrow[Q, PredicateResult],
statsReceiver: StatsReceiver
): Arrow[Q, PredicateResult] = {
profilePredicateResults(
predicateResult.map(Seq(_)),
statsReceiver
).map(_.head)
}
/**
* Helper function for timing a stitch code block
*/
def profileStitchSeqResults[T](
stats: StatsReceiver
)(
block: => Stitch[Seq[T]]
): Stitch[Seq[T]] = {
stats.counter(RequestName).incr()
profileStitch(stats.stat(LatencyName), TimeUnit.MILLISECONDS) {
block onSuccess { r =>
if (r.isEmpty) stats.counter(EmptyResultsName).incr()
stats.stat(ResultsStat).add(r.size)
} onFailure { e =>
{
stats.counter(FailureName).incr()
stats.scope(FailureName).counter(e.getClass.getName).incr()
}
}
}
}
/**
* Time a given asynchronous `f` using the given `unit`.
*/
def profileStitch[A](stat: Stat, unit: TimeUnit)(f: => Stitch[A]): Stitch[A] = {
val start = Stopwatch.timeNanos()
try {
f.respond { _ => stat.add(unit.convert(Stopwatch.timeNanos() - start, TimeUnit.NANOSECONDS)) }
} catch {
case NonFatal(e) =>
stat.add(unit.convert(Stopwatch.timeNanos() - start, TimeUnit.NANOSECONDS))
Stitch.exception(e)
}
}
def observeStitchQualityFactor[T](
stitch: Stitch[T],
qualityFactorObserverOption: Option[QualityFactorObserver],
statsReceiver: StatsReceiver
): Stitch[T] = {
qualityFactorObserverOption
.map { observer =>
Stitch
.time(stitch)
.map {
case (response, stitchRunDuration) =>
observer(response, stitchRunDuration)
val qfVal = observer.qualityFactor.currentValue.floatValue() * 10000
statsReceiver.counter(QualityFactorCounts).incr()
statsReceiver
.stat(QualityFactorStat)
.add(qfVal)
response
}
.lowerFromTry
}.getOrElse(stitch)
}
}

View File

@ -1,85 +0,0 @@
package com.twitter.follow_recommendations.common.base
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.stitch.Stitch
import com.twitter.timelines.configapi.HasParams
import com.twitter.timelines.configapi.Param
/**
* transform a or a list of candidate for target T
*
* @tparam T target type
* @tparam C candidate type
*/
trait Transform[-T, C] {
// you need to implement at least one of the two methods here.
def transformItem(target: T, item: C): Stitch[C] = {
transform(target, Seq(item)).map(_.head)
}
def transform(target: T, items: Seq[C]): Stitch[Seq[C]]
def mapTarget[T2](mapper: T2 => T): Transform[T2, C] = {
val original = this
new Transform[T2, C] {
override def transformItem(target: T2, item: C): Stitch[C] = {
original.transformItem(mapper(target), item)
}
override def transform(target: T2, items: Seq[C]): Stitch[Seq[C]] = {
original.transform(mapper(target), items)
}
}
}
/**
* sequential composition. we execute this' transform first, followed by the other's transform
*/
def andThen[T1 <: T](other: Transform[T1, C]): Transform[T1, C] = {
val original = this
new Transform[T1, C] {
override def transformItem(target: T1, item: C): Stitch[C] =
original.transformItem(target, item).flatMap(other.transformItem(target, _))
override def transform(target: T1, items: Seq[C]): Stitch[Seq[C]] =
original.transform(target, items).flatMap(other.transform(target, _))
}
}
def observe(statsReceiver: StatsReceiver): Transform[T, C] = {
val originalTransform = this
new Transform[T, C] {
override def transform(target: T, items: Seq[C]): Stitch[Seq[C]] = {
statsReceiver.counter(Transform.InputCandidatesCount).incr(items.size)
statsReceiver.stat(Transform.InputCandidatesStat).add(items.size)
StatsUtil.profileStitchSeqResults(originalTransform.transform(target, items), statsReceiver)
}
override def transformItem(target: T, item: C): Stitch[C] = {
statsReceiver.counter(Transform.InputCandidatesCount).incr()
StatsUtil.profileStitch(originalTransform.transformItem(target, item), statsReceiver)
}
}
}
}
trait GatedTransform[T <: HasParams, C] extends Transform[T, C] {
def gated(param: Param[Boolean]): Transform[T, C] = {
val original = this
(target: T, items: Seq[C]) => {
if (target.params(param)) {
original.transform(target, items)
} else {
Stitch.value(items)
}
}
}
}
object Transform {
val InputCandidatesCount = "input_candidates"
val InputCandidatesStat = "input_candidates_stat"
}
class IdentityTransform[T, C] extends Transform[T, C] {
override def transform(target: T, items: Seq[C]): Stitch[Seq[C]] = Stitch.value(items)
}

View File

@ -1,9 +0,0 @@
package com.twitter.follow_recommendations.common.candidate_sources.addressbook
import com.twitter.timelines.configapi.FSParam
object AddressBookParams {
// Used by display locations that want only to read from the ABV2 Client and ignore Manhattan
// Currently the only display location that does this is the ABUploadInjection DisplayLocation
object ReadFromABV2Only extends FSParam[Boolean]("addressbook_read_only_from_abv2", false)
}

View File

@ -1,27 +0,0 @@
scala_library(
compiler_option_sets = ["fatal_warnings"],
platform = "java8",
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/com/google/inject:guice",
"3rdparty/jvm/com/google/inject/extensions:guice-assistedinject",
"3rdparty/jvm/net/codingwell:scala-guice",
"3rdparty/jvm/org/slf4j:slf4j-api",
"finatra/inject/inject-core/src/main/scala",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/base",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/candidate_sources/base",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/clients/addressbook",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/clients/email_storage_service",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/clients/gizmoduck",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/clients/phone_storage_service",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/clients/strato",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/constants",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/models",
"follow-recommendations-service/server/src/main/scala/com/twitter/follow_recommendations/configapi/deciders",
"src/thrift/com/twitter/hermit/candidate:hermit-candidate-scala",
"src/thrift/com/twitter/hermit/usercontacts:hermit-usercontacts-scala",
"strato/config/columns/onboarding/userrecs:userrecs-strato-client",
"strato/src/main/scala/com/twitter/strato/client",
"util/util-slf4j-api/src/main/scala",
],
)

View File

@ -1,74 +0,0 @@
package com.twitter.follow_recommendations.common.candidate_sources.addressbook
import com.twitter.finagle.stats.NullStatsReceiver
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.follow_recommendations.common.candidate_sources.addressbook.AddressBookParams.ReadFromABV2Only
import com.twitter.follow_recommendations.common.clients.addressbook.AddressbookClient
import com.twitter.follow_recommendations.common.clients.addressbook.models.EdgeType
import com.twitter.follow_recommendations.common.clients.addressbook.models.RecordIdentifier
import com.twitter.follow_recommendations.common.models.CandidateUser
import com.twitter.follow_recommendations.common.utils.RescueWithStatsUtils.rescueWithStats
import com.twitter.hermit.model.Algorithm
import com.twitter.product_mixer.core.functional_component.candidate_source.CandidateSource
import com.twitter.product_mixer.core.model.common.identifier.CandidateSourceIdentifier
import com.twitter.product_mixer.core.model.marshalling.request.HasClientContext
import com.twitter.stitch.Stitch
import com.twitter.strato.generated.client.onboarding.userrecs.ForwardEmailBookClientColumn
import com.twitter.timelines.configapi.HasParams
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
class ForwardEmailBookSource @Inject() (
forwardEmailBookClientColumn: ForwardEmailBookClientColumn,
addressBookClient: AddressbookClient,
statsReceiver: StatsReceiver = NullStatsReceiver)
extends CandidateSource[HasParams with HasClientContext, CandidateUser] {
override val identifier: CandidateSourceIdentifier =
ForwardEmailBookSource.Identifier
private val stats: StatsReceiver = statsReceiver.scope(this.getClass.getSimpleName)
/**
* Generate a list of candidates for the target
*/
override def apply(
target: HasParams with HasClientContext
): Stitch[Seq[CandidateUser]] = {
val candidateUsers: Stitch[Seq[Long]] = target.getOptionalUserId
.map { userId =>
rescueWithStats(
addressBookClient.getUsers(
userId = userId,
identifiers =
Seq(RecordIdentifier(userId = Some(userId), email = None, phoneNumber = None)),
batchSize = AddressbookClient.AddressBook2BatchSize,
edgeType = ForwardEmailBookSource.DefaultEdgeType,
fetcherOption =
if (target.params.apply(ReadFromABV2Only)) None
else Some(forwardEmailBookClientColumn.fetcher),
queryOption = AddressbookClient
.createQueryOption(
edgeType = ForwardEmailBookSource.DefaultEdgeType,
isPhone = ForwardEmailBookSource.IsPhone)
),
stats,
"AddressBookClient"
)
}.getOrElse(Stitch.Nil)
candidateUsers
.map(
_.take(ForwardEmailBookSource.NumEmailBookEntries)
.map(CandidateUser(_, score = Some(CandidateUser.DefaultCandidateScore))
.withCandidateSource(identifier)))
}
}
object ForwardEmailBookSource {
val Identifier: CandidateSourceIdentifier = CandidateSourceIdentifier(
Algorithm.ForwardEmailBook.toString)
val NumEmailBookEntries: Int = 1000
val IsPhone = false
val DefaultEdgeType: EdgeType = EdgeType.Forward
}

Some files were not shown because too many files have changed in this diff Show More