From 94ff4caea8daa19f28e2526d12f7770f0bcbc693 Mon Sep 17 00:00:00 2001 From: twitter-team <> Date: Mon, 10 Apr 2023 13:54:30 -0700 Subject: [PATCH] Open-sourcing Topic Social Proof Service Topic Social Proof Service (TSPS) delivers highly relevant topics tailored to a user's interests by analyzing topic preferences, such as following or unfollowing, and employing semantic annotations and other machine learning models. --- topic-social-proof/README.md | 8 + topic-social-proof/server/BUILD | 24 + .../server/src/main/resources/BUILD | 8 + .../src/main/resources/config/decider.yml | 61 ++ .../server/src/main/resources/logback.xml | 155 +++++ .../src/main/scala/com/twitter/tsp/BUILD | 12 + .../tsp/TopicSocialProofStratoFedServer.scala | 56 ++ .../main/scala/com/twitter/tsp/columns/BUILD | 12 + .../columns/TopicSocialProofBatchColumn.scala | 84 +++ .../tsp/columns/TopicSocialProofColumn.scala | 47 ++ .../main/scala/com/twitter/tsp/common/BUILD | 23 + .../twitter/tsp/common/DeciderConstants.scala | 19 + .../tsp/common/FeatureSwitchesBuilder.scala | 34 + .../com/twitter/tsp/common/LoadShedder.scala | 44 ++ .../twitter/tsp/common/ParamsBuilder.scala | 98 +++ .../twitter/tsp/common/RecTargetFactory.scala | 65 ++ .../tsp/common/TopicSocialProofDecider.scala | 26 + .../tsp/common/TopicSocialProofParams.scala | 104 ++++ .../main/scala/com/twitter/tsp/handlers/BUILD | 14 + .../handlers/TopicSocialProofHandler.scala | 587 ++++++++++++++++++ .../handlers/UttChildrenWarmupHandler.scala | 40 ++ .../main/scala/com/twitter/tsp/modules/BUILD | 30 + .../tsp/modules/GizmoduckUserModule.scala | 35 ++ .../RepresentationScorerStoreModule.scala | 47 ++ .../tsp/modules/TSPClientIdModule.scala | 14 + .../tsp/modules/TopicListingModule.scala | 17 + .../modules/TopicSocialProofStoreModule.scala | 68 ++ ...CosineSimilarityAggregateStoreModule.scala | 26 + .../tsp/modules/TweetInfoStoreModule.scala | 130 ++++ .../tsp/modules/TweetyPieClientModule.scala | 63 ++ .../tsp/modules/UnifiedCacheClient.scala | 33 + .../twitter/tsp/modules/UttClientModule.scala | 41 ++ .../tsp/modules/UttLocalizationModule.scala | 27 + .../main/scala/com/twitter/tsp/service/BUILD | 23 + .../tsp/service/TopicSocialProofService.scala | 182 ++++++ .../main/scala/com/twitter/tsp/stores/BUILD | 32 + ...LocalizedUttRecommendableTopicsStore.scala | 30 + .../stores/RepresentationScorerStore.scala | 31 + .../stores/SemanticCoreAnnotationStore.scala | 64 ++ .../tsp/stores/TopicSocialProofStore.scala | 127 ++++ .../com/twitter/tsp/stores/TopicStore.scala | 135 ++++ ...TweetsCosineSimilarityAggregateStore.scala | 99 +++ .../twitter/tsp/stores/TweetInfoStore.scala | 230 +++++++ .../tsp/stores/UttTopicFilterStore.scala | 248 ++++++++ .../main/scala/com/twitter/tsp/utils/BUILD | 14 + .../com/twitter/tsp/utils/LZ4Injection.scala | 19 + .../ReadableStoreWithMapOptionValues.scala | 20 + .../tsp/utils/SeqObjectInjection.scala | 32 + .../server/src/main/thrift/BUILD | 21 + .../server/src/main/thrift/service.thrift | 104 ++++ .../server/src/main/thrift/tweet_info.thrift | 26 + 51 files changed, 3489 insertions(+) create mode 100644 topic-social-proof/README.md create mode 100644 topic-social-proof/server/BUILD create mode 100644 topic-social-proof/server/src/main/resources/BUILD create mode 100644 topic-social-proof/server/src/main/resources/config/decider.yml create mode 100644 topic-social-proof/server/src/main/resources/logback.xml create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/BUILD create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/TopicSocialProofStratoFedServer.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/columns/BUILD create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/columns/TopicSocialProofBatchColumn.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/columns/TopicSocialProofColumn.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/common/BUILD create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/common/DeciderConstants.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/common/FeatureSwitchesBuilder.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/common/LoadShedder.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/common/ParamsBuilder.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/common/RecTargetFactory.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/common/TopicSocialProofDecider.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/common/TopicSocialProofParams.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/handlers/BUILD create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/handlers/TopicSocialProofHandler.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/handlers/UttChildrenWarmupHandler.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/BUILD create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/GizmoduckUserModule.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/RepresentationScorerStoreModule.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/TSPClientIdModule.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/TopicListingModule.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/TopicSocialProofStoreModule.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/TopicTweetCosineSimilarityAggregateStoreModule.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/TweetInfoStoreModule.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/TweetyPieClientModule.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/UnifiedCacheClient.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/UttClientModule.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/UttLocalizationModule.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/service/BUILD create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/service/TopicSocialProofService.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/BUILD create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/LocalizedUttRecommendableTopicsStore.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/RepresentationScorerStore.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/SemanticCoreAnnotationStore.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/TopicSocialProofStore.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/TopicStore.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/TopicTweetsCosineSimilarityAggregateStore.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/TweetInfoStore.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/UttTopicFilterStore.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/utils/BUILD create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/utils/LZ4Injection.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/utils/ReadableStoreWithMapOptionValues.scala create mode 100644 topic-social-proof/server/src/main/scala/com/twitter/tsp/utils/SeqObjectInjection.scala create mode 100644 topic-social-proof/server/src/main/thrift/BUILD create mode 100644 topic-social-proof/server/src/main/thrift/service.thrift create mode 100644 topic-social-proof/server/src/main/thrift/tweet_info.thrift diff --git a/topic-social-proof/README.md b/topic-social-proof/README.md new file mode 100644 index 000000000..d98b7ba3b --- /dev/null +++ b/topic-social-proof/README.md @@ -0,0 +1,8 @@ +# Topic Social Proof Service (TSPS) +================= + +**Topic Social Proof Service** (TSPS) serves as a centralized source for verifying topics related to Timelines and Notifications. By analyzing user's topic preferences, such as following or unfollowing, and employing semantic annotations and tweet embeddings from SimClusters, or other machine learning models, TSPS delivers highly relevant topics tailored to each user's interests. + +For instance, when a tweet discusses Stephen Curry, the service determines if the content falls under topics like "NBA" and/or "Golden State Warriors" while also providing relevance scores based on SimClusters Embedding. Additionally, TSPS evaluates user-specific topic preferences to offer a comprehensive list of available topics, only those the user is currently following, or new topics they have not followed but may find interesting if recommended on specific product surfaces. + + diff --git a/topic-social-proof/server/BUILD b/topic-social-proof/server/BUILD new file mode 100644 index 000000000..9fb977d17 --- /dev/null +++ b/topic-social-proof/server/BUILD @@ -0,0 +1,24 @@ +jvm_binary( + name = "bin", + basename = "topic-social-proof", + main = "com.twitter.tsp.TopicSocialProofStratoFedServerMain", + runtime_platform = "java11", + tags = [ + "bazel-compatible", + ], + dependencies = [ + "strato/src/main/scala/com/twitter/strato/logging/logback", + "topic-social-proof/server/src/main/resources", + "topic-social-proof/server/src/main/scala/com/twitter/tsp", + ], +) + +# Aurora Workflows build phase convention requires a jvm_app named with ${project-name}-app +jvm_app( + name = "topic-social-proof-app", + archive = "zip", + binary = ":bin", + tags = [ + "bazel-compatible", + ], +) diff --git a/topic-social-proof/server/src/main/resources/BUILD b/topic-social-proof/server/src/main/resources/BUILD new file mode 100644 index 000000000..8f96f402c --- /dev/null +++ b/topic-social-proof/server/src/main/resources/BUILD @@ -0,0 +1,8 @@ +resources( + sources = [ + "*.xml", + "*.yml", + "config/*.yml", + ], + tags = ["bazel-compatible"], +) diff --git a/topic-social-proof/server/src/main/resources/config/decider.yml b/topic-social-proof/server/src/main/resources/config/decider.yml new file mode 100644 index 000000000..c40dd7080 --- /dev/null +++ b/topic-social-proof/server/src/main/resources/config/decider.yml @@ -0,0 +1,61 @@ +# Keys are sorted in an alphabetical order + +enable_topic_social_proof_score: + comment : "Enable the calculation of cosine similarity score in TopicSocialProofStore. 0 means do not calculate the score and use a random rank to generate topic social proof" + default_availability: 0 + +enable_tweet_health_score: + comment: "Enable the calculation for health scores in tweetInfo. By enabling this decider, we will compute TweetHealthModelScore" + default_availability: 0 + +enable_user_agatha_score: + comment: "Enable the calculation for health scores in tweetInfo. By enabling this decider, we will compute UserHealthModelScore" + default_availability: 0 + +enable_loadshedding_HomeTimeline: + comment: "Enable loadshedding (from 0% to 100%). Requests that have been shed will return an empty response" + default_availability: 0 + +enable_loadshedding_HomeTimelineTopicTweets: + comment: "Enable loadshedding (from 0% to 100%). Requests that have been shed will return an empty response" + default_availability: 0 + +enable_loadshedding_HomeTimelineRecommendTopicTweets: + comment: "Enable loadshedding (from 0% to 100%). Requests that have been shed will return an empty response" + default_availability: 0 + +enable_loadshedding_MagicRecsRecommendTopicTweets: + comment: "Enable loadshedding (from 0% to 100%). Requests that have been shed will return an empty response" + default_availability: 0 + +enable_loadshedding_TopicLandingPage: + comment: "Enable loadshedding (from 0% to 100%). Requests that have been shed will return an empty response" + default_availability: 0 + +enable_loadshedding_HomeTimelineFeatures: + comment: "Enable loadshedding (from 0% to 100%). Requests that have been shed will return an empty response" + default_availability: 0 + +enable_loadshedding_HomeTimelineTopicTweetsMetrics: + comment: "Enable loadshedding (from 0% to 100%). Requests that have been shed will return an empty response" + default_availability: 0 + +enable_loadshedding_HomeTimelineUTEGTopicTweets: + comment: "Enable loadshedding (from 0% to 100%). Requests that have been shed will return an empty response" + default_availability: 0 + +enable_loadshedding_HomeTimelineSimClusters: + comment: "Enable loadshedding (from 0% to 100%). Requests that have been shed will return an empty response" + default_availability: 0 + +enable_loadshedding_ExploreTopicTweets: + comment: "Enable loadshedding (from 0% to 100%). Requests that have been shed will return an empty response" + default_availability: 0 + +enable_loadshedding_MagicRecsTopicTweets: + comment: "Enable loadshedding (from 0% to 100%). Requests that have been shed will return an empty response" + default_availability: 0 + +enable_loadshedding_Search: + comment: "Enable loadshedding (from 0% to 100%). Requests that have been shed will return an empty response" + default_availability: 0 diff --git a/topic-social-proof/server/src/main/resources/logback.xml b/topic-social-proof/server/src/main/resources/logback.xml new file mode 100644 index 000000000..d08b0a965 --- /dev/null +++ b/topic-social-proof/server/src/main/resources/logback.xml @@ -0,0 +1,155 @@ + + + + + + + + + + + + + + + + + + + + + + true + + + + + ${log.service.output} + + ${log.service.output}.%i + 1 + 10 + + + 50MB + + + %date %.-3level ${DEFAULT_SERVICE_PATTERN}%n + + + + + + ${log.strato_only.output} + + ${log.strato_only.output}.%i + 1 + 10 + + + 50MB + + + %date %.-3level ${DEFAULT_SERVICE_PATTERN}%n + + + + + + true + loglens + ${log.lens.index} + ${log.lens.tag}/service + + %msg%n + + + 500 + 50 + + + manhattan-client + .*InvalidRequest.* + + + + + + + + + ${async_queue_size} + ${async_max_flush_time} + + + + + ${async_queue_size} + ${async_max_flush_time} + + + + + ${async_queue_size} + ${async_max_flush_time} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/BUILD b/topic-social-proof/server/src/main/scala/com/twitter/tsp/BUILD new file mode 100644 index 000000000..2052c5047 --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/BUILD @@ -0,0 +1,12 @@ +scala_library( + compiler_option_sets = ["fatal_warnings"], + tags = [ + "bazel-compatible", + ], + dependencies = [ + "finatra/inject/inject-thrift-client", + "strato/src/main/scala/com/twitter/strato/fed", + "strato/src/main/scala/com/twitter/strato/fed/server", + "topic-social-proof/server/src/main/scala/com/twitter/tsp/columns", + ], +) diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/TopicSocialProofStratoFedServer.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/TopicSocialProofStratoFedServer.scala new file mode 100644 index 000000000..22d3c19f0 --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/TopicSocialProofStratoFedServer.scala @@ -0,0 +1,56 @@ +package com.twitter.tsp + +import com.google.inject.Module +import com.twitter.strato.fed._ +import com.twitter.strato.fed.server._ +import com.twitter.strato.warmup.Warmer +import com.twitter.tsp.columns.TopicSocialProofColumn +import com.twitter.tsp.columns.TopicSocialProofBatchColumn +import com.twitter.tsp.handlers.UttChildrenWarmupHandler +import com.twitter.tsp.modules.RepresentationScorerStoreModule +import com.twitter.tsp.modules.GizmoduckUserModule +import com.twitter.tsp.modules.TSPClientIdModule +import com.twitter.tsp.modules.TopicListingModule +import com.twitter.tsp.modules.TopicSocialProofStoreModule +import com.twitter.tsp.modules.TopicTweetCosineSimilarityAggregateStoreModule +import com.twitter.tsp.modules.TweetInfoStoreModule +import com.twitter.tsp.modules.TweetyPieClientModule +import com.twitter.tsp.modules.UttClientModule +import com.twitter.tsp.modules.UttLocalizationModule +import com.twitter.util.Future + +object TopicSocialProofStratoFedServerMain extends TopicSocialProofStratoFedServer + +trait TopicSocialProofStratoFedServer extends StratoFedServer { + override def dest: String = "/s/topic-social-proof/topic-social-proof" + + override val modules: Seq[Module] = + Seq( + GizmoduckUserModule, + RepresentationScorerStoreModule, + TopicSocialProofStoreModule, + TopicListingModule, + TopicTweetCosineSimilarityAggregateStoreModule, + TSPClientIdModule, + TweetInfoStoreModule, + TweetyPieClientModule, + UttClientModule, + UttLocalizationModule + ) + + override def columns: Seq[Class[_ <: StratoFed.Column]] = + Seq( + classOf[TopicSocialProofColumn], + classOf[TopicSocialProofBatchColumn] + ) + + override def configureWarmer(warmer: Warmer): Unit = { + warmer.add( + "uttChildrenWarmupHandler", + () => { + handle[UttChildrenWarmupHandler]() + Future.Unit + } + ) + } +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/columns/BUILD b/topic-social-proof/server/src/main/scala/com/twitter/tsp/columns/BUILD new file mode 100644 index 000000000..c29b7ea35 --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/columns/BUILD @@ -0,0 +1,12 @@ +scala_library( + compiler_option_sets = ["fatal_warnings"], + tags = [ + "bazel-compatible", + ], + dependencies = [ + "stitch/stitch-storehaus", + "strato/src/main/scala/com/twitter/strato/fed", + "topic-social-proof/server/src/main/scala/com/twitter/tsp/service", + "topic-social-proof/server/src/main/thrift:thrift-scala", + ], +) diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/columns/TopicSocialProofBatchColumn.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/columns/TopicSocialProofBatchColumn.scala new file mode 100644 index 000000000..f451e662a --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/columns/TopicSocialProofBatchColumn.scala @@ -0,0 +1,84 @@ +package com.twitter.tsp.columns + +import com.twitter.stitch.SeqGroup +import com.twitter.stitch.Stitch +import com.twitter.strato.catalog.Fetch +import com.twitter.strato.catalog.OpMetadata +import com.twitter.strato.config._ +import com.twitter.strato.config.AllowAll +import com.twitter.strato.config.ContactInfo +import com.twitter.strato.config.Policy +import com.twitter.strato.data.Conv +import com.twitter.strato.data.Description.PlainText +import com.twitter.strato.data.Lifecycle.Production +import com.twitter.strato.fed.StratoFed +import com.twitter.strato.thrift.ScroogeConv +import com.twitter.tsp.thriftscala.TopicSocialProofRequest +import com.twitter.tsp.thriftscala.TopicSocialProofOptions +import com.twitter.tsp.service.TopicSocialProofService +import com.twitter.tsp.thriftscala.TopicWithScore +import com.twitter.util.Future +import com.twitter.util.Try +import javax.inject.Inject + +class TopicSocialProofBatchColumn @Inject() ( + topicSocialProofService: TopicSocialProofService) + extends StratoFed.Column(TopicSocialProofBatchColumn.Path) + with StratoFed.Fetch.Stitch { + + override val policy: Policy = + ReadWritePolicy( + readPolicy = AllowAll, + writePolicy = AllowKeyAuthenticatedTwitterUserId + ) + + override type Key = Long + override type View = TopicSocialProofOptions + override type Value = Seq[TopicWithScore] + + override val keyConv: Conv[Key] = Conv.ofType + override val viewConv: Conv[View] = ScroogeConv.fromStruct[TopicSocialProofOptions] + override val valueConv: Conv[Value] = Conv.seq(ScroogeConv.fromStruct[TopicWithScore]) + override val metadata: OpMetadata = + OpMetadata( + lifecycle = Some(Production), + Some(PlainText("Topic Social Proof Batched Federated Column"))) + + case class TspsGroup(view: View) extends SeqGroup[Long, Fetch.Result[Value]] { + override protected def run(keys: Seq[Long]): Future[Seq[Try[Result[Seq[TopicWithScore]]]]] = { + val request = TopicSocialProofRequest( + userId = view.userId, + tweetIds = keys.toSet, + displayLocation = view.displayLocation, + topicListingSetting = view.topicListingSetting, + context = view.context, + bypassModes = view.bypassModes, + tags = view.tags + ) + + val response = topicSocialProofService + .topicSocialProofHandlerStoreStitch(request) + .map(_.socialProofs) + Stitch + .run(response).map(r => + keys.map(key => { + Try { + val v = r.get(key) + if (v.nonEmpty && v.get.nonEmpty) { + found(v.get) + } else { + missing + } + } + })) + } + } + + override def fetch(key: Key, view: View): Stitch[Result[Value]] = { + Stitch.call(key, TspsGroup(view)) + } +} + +object TopicSocialProofBatchColumn { + val Path = "topic-signals/tsp/topic-social-proof-batched" +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/columns/TopicSocialProofColumn.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/columns/TopicSocialProofColumn.scala new file mode 100644 index 000000000..10425eccb --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/columns/TopicSocialProofColumn.scala @@ -0,0 +1,47 @@ +package com.twitter.tsp.columns + +import com.twitter.stitch +import com.twitter.stitch.Stitch +import com.twitter.strato.catalog.OpMetadata +import com.twitter.strato.config._ +import com.twitter.strato.config.AllowAll +import com.twitter.strato.config.ContactInfo +import com.twitter.strato.config.Policy +import com.twitter.strato.data.Conv +import com.twitter.strato.data.Description.PlainText +import com.twitter.strato.data.Lifecycle.Production +import com.twitter.strato.fed.StratoFed +import com.twitter.strato.thrift.ScroogeConv +import com.twitter.tsp.thriftscala.TopicSocialProofRequest +import com.twitter.tsp.thriftscala.TopicSocialProofResponse +import com.twitter.tsp.service.TopicSocialProofService +import javax.inject.Inject + +class TopicSocialProofColumn @Inject() ( + topicSocialProofService: TopicSocialProofService) + extends StratoFed.Column(TopicSocialProofColumn.Path) + with StratoFed.Fetch.Stitch { + + override type Key = TopicSocialProofRequest + override type View = Unit + override type Value = TopicSocialProofResponse + + override val keyConv: Conv[Key] = ScroogeConv.fromStruct[TopicSocialProofRequest] + override val viewConv: Conv[View] = Conv.ofType + override val valueConv: Conv[Value] = ScroogeConv.fromStruct[TopicSocialProofResponse] + override val metadata: OpMetadata = + OpMetadata(lifecycle = Some(Production), Some(PlainText("Topic Social Proof Federated Column"))) + + override def fetch(key: Key, view: View): Stitch[Result[Value]] = { + topicSocialProofService + .topicSocialProofHandlerStoreStitch(key) + .map { result => found(result) } + .handle { + case stitch.NotFound => missing + } + } +} + +object TopicSocialProofColumn { + val Path = "topic-signals/tsp/topic-social-proof" +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/common/BUILD b/topic-social-proof/server/src/main/scala/com/twitter/tsp/common/BUILD new file mode 100644 index 000000000..7b5fda3b0 --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/common/BUILD @@ -0,0 +1,23 @@ +scala_library( + compiler_option_sets = ["fatal_warnings"], + tags = [ + "bazel-compatible", + ], + dependencies = [ + "configapi/configapi-abdecider", + "configapi/configapi-core", + "content-recommender/thrift/src/main/thrift:thrift-scala", + "decider/src/main/scala", + "discovery-common/src/main/scala/com/twitter/discovery/common/configapi", + "featureswitches/featureswitches-core", + "finatra/inject/inject-core/src/main/scala", + "frigate/frigate-common:base", + "frigate/frigate-common:util", + "frigate/frigate-common/src/main/scala/com/twitter/frigate/common/candidate", + "interests-service/thrift/src/main/thrift:thrift-scala", + "src/scala/com/twitter/simclusters_v2/common", + "src/thrift/com/twitter/simclusters_v2:simclusters_v2-thrift-scala", + "stitch/stitch-storehaus", + "topic-social-proof/server/src/main/thrift:thrift-scala", + ], +) diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/common/DeciderConstants.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/common/DeciderConstants.scala new file mode 100644 index 000000000..de025128d --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/common/DeciderConstants.scala @@ -0,0 +1,19 @@ +package com.twitter.tsp.common + +import com.twitter.servo.decider.DeciderKeyEnum + +object DeciderConstants { + val enableTopicSocialProofScore = "enable_topic_social_proof_score" + val enableHealthSignalsScoreDeciderKey = "enable_tweet_health_score" + val enableUserAgathaScoreDeciderKey = "enable_user_agatha_score" +} + +object DeciderKey extends DeciderKeyEnum { + + val enableHealthSignalsScoreDeciderKey: Value = Value( + DeciderConstants.enableHealthSignalsScoreDeciderKey + ) + val enableUserAgathaScoreDeciderKey: Value = Value( + DeciderConstants.enableUserAgathaScoreDeciderKey + ) +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/common/FeatureSwitchesBuilder.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/common/FeatureSwitchesBuilder.scala new file mode 100644 index 000000000..a3b269cba --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/common/FeatureSwitchesBuilder.scala @@ -0,0 +1,34 @@ +package com.twitter.tsp.common + +import com.twitter.abdecider.LoggingABDecider +import com.twitter.featureswitches.v2.FeatureSwitches +import com.twitter.featureswitches.v2.builder.{FeatureSwitchesBuilder => FsBuilder} +import com.twitter.featureswitches.v2.experimentation.NullBucketImpressor +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.util.Duration + +case class FeatureSwitchesBuilder( + statsReceiver: StatsReceiver, + abDecider: LoggingABDecider, + featuresDirectory: String, + addServiceDetailsFromAurora: Boolean, + configRepoDirectory: String = "/usr/local/config", + fastRefresh: Boolean = false, + impressExperiments: Boolean = true) { + + def build(): FeatureSwitches = { + val featureSwitches = FsBuilder() + .abDecider(abDecider) + .statsReceiver(statsReceiver) + .configRepoAbsPath(configRepoDirectory) + .featuresDirectory(featuresDirectory) + .limitToReferencedExperiments(shouldLimit = true) + .experimentImpressionStatsEnabled(true) + + if (!impressExperiments) featureSwitches.experimentBucketImpressor(NullBucketImpressor) + if (addServiceDetailsFromAurora) featureSwitches.serviceDetailsFromAurora() + if (fastRefresh) featureSwitches.refreshPeriod(Duration.fromSeconds(10)) + + featureSwitches.build() + } +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/common/LoadShedder.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/common/LoadShedder.scala new file mode 100644 index 000000000..2071ea07e --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/common/LoadShedder.scala @@ -0,0 +1,44 @@ +package com.twitter.tsp.common + +import com.twitter.decider.Decider +import com.twitter.decider.RandomRecipient +import com.twitter.util.Future +import javax.inject.Inject +import scala.util.control.NoStackTrace + +/* + Provides deciders-controlled load shedding for a given displayLocation + The format of the decider keys is: + + enable_loadshedding_ + E.g.: + enable_loadshedding_HomeTimeline + + Deciders are fractional, so a value of 50.00 will drop 50% of responses. If a decider key is not + defined for a particular displayLocation, those requests will always be served. + + We should therefore aim to define keys for the locations we care most about in decider.yml, + so that we can control them during incidents. + */ +class LoadShedder @Inject() (decider: Decider) { + import LoadShedder._ + + // Fall back to False for any undefined key + private val deciderWithFalseFallback: Decider = decider.orElse(Decider.False) + private val keyPrefix = "enable_loadshedding" + + def apply[T](typeString: String)(serve: => Future[T]): Future[T] = { + /* + Per-typeString level load shedding: enable_loadshedding_HomeTimeline + Checks if per-typeString load shedding is enabled + */ + val keyTyped = s"${keyPrefix}_$typeString" + if (deciderWithFalseFallback.isAvailable(keyTyped, recipient = Some(RandomRecipient))) + Future.exception(LoadSheddingException) + else serve + } +} + +object LoadShedder { + object LoadSheddingException extends Exception with NoStackTrace +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/common/ParamsBuilder.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/common/ParamsBuilder.scala new file mode 100644 index 000000000..93fe9cbaf --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/common/ParamsBuilder.scala @@ -0,0 +1,98 @@ +package com.twitter.tsp.common + +import com.twitter.abdecider.LoggingABDecider +import com.twitter.abdecider.UserRecipient +import com.twitter.contentrecommender.thriftscala.DisplayLocation +import com.twitter.discovery.common.configapi.FeatureContextBuilder +import com.twitter.featureswitches.FSRecipient +import com.twitter.featureswitches.Recipient +import com.twitter.featureswitches.UserAgent +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.interests.thriftscala.TopicListingViewerContext +import com.twitter.timelines.configapi +import com.twitter.timelines.configapi.Params +import com.twitter.timelines.configapi.RequestContext +import com.twitter.timelines.configapi.abdecider.LoggingABDeciderExperimentContext + +case class ParamsBuilder( + featureContextBuilder: FeatureContextBuilder, + abDecider: LoggingABDecider, + overridesConfig: configapi.Config, + statsReceiver: StatsReceiver) { + + def buildFromTopicListingViewerContext( + topicListingViewerContext: Option[TopicListingViewerContext], + displayLocation: DisplayLocation, + userRoleOverride: Option[Set[String]] = None + ): Params = { + + topicListingViewerContext.flatMap(_.userId) match { + case Some(userId) => + val userRecipient = ParamsBuilder.toFeatureSwitchRecipientWithTopicContext( + userId, + userRoleOverride, + topicListingViewerContext, + Some(displayLocation) + ) + + overridesConfig( + requestContext = RequestContext( + userId = Some(userId), + experimentContext = LoggingABDeciderExperimentContext( + abDecider, + Some(UserRecipient(userId, Some(userId)))), + featureContext = featureContextBuilder( + Some(userId), + Some(userRecipient) + ) + ), + statsReceiver + ) + case _ => + throw new IllegalArgumentException( + s"${this.getClass.getSimpleName} tried to build Param for a request without a userId" + ) + } + } +} + +object ParamsBuilder { + + def toFeatureSwitchRecipientWithTopicContext( + userId: Long, + userRolesOverride: Option[Set[String]], + context: Option[TopicListingViewerContext], + displayLocationOpt: Option[DisplayLocation] + ): Recipient = { + val userRoles = userRolesOverride match { + case Some(overrides) => Some(overrides) + case _ => context.flatMap(_.userRoles.map(_.toSet)) + } + + val recipient = FSRecipient( + userId = Some(userId), + userRoles = userRoles, + deviceId = context.flatMap(_.deviceId), + guestId = context.flatMap(_.guestId), + languageCode = context.flatMap(_.languageCode), + countryCode = context.flatMap(_.countryCode), + userAgent = context.flatMap(_.userAgent).flatMap(UserAgent(_)), + isVerified = None, + isTwoffice = None, + tooClient = None, + highWaterMark = None + ) + displayLocationOpt match { + case Some(displayLocation) => + recipient.withCustomFields(displayLocationCustomFieldMap(displayLocation)) + case None => + recipient + } + } + + private val DisplayLocationCustomField = "display_location" + + def displayLocationCustomFieldMap(displayLocation: DisplayLocation): (String, String) = + DisplayLocationCustomField -> displayLocation.toString + +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/common/RecTargetFactory.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/common/RecTargetFactory.scala new file mode 100644 index 000000000..26eeda736 --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/common/RecTargetFactory.scala @@ -0,0 +1,65 @@ +package com.twitter.tsp.common + +import com.twitter.abdecider.LoggingABDecider +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.frigate.common.base.TargetUser +import com.twitter.frigate.common.candidate.TargetABDecider +import com.twitter.frigate.common.util.ABDeciderWithOverride +import com.twitter.gizmoduck.thriftscala.User +import com.twitter.simclusters_v2.common.UserId +import com.twitter.storehaus.ReadableStore +import com.twitter.timelines.configapi.Params +import com.twitter.tsp.thriftscala.TopicSocialProofRequest +import com.twitter.util.Future + +case class DefaultRecTopicSocialProofTarget( + topicSocialProofRequest: TopicSocialProofRequest, + targetId: UserId, + user: Option[User], + abDecider: ABDeciderWithOverride, + params: Params +)( + implicit statsReceiver: StatsReceiver) + extends TargetUser + with TopicSocialProofRecRequest + with TargetABDecider { + override def globalStats: StatsReceiver = statsReceiver + override val targetUser: Future[Option[User]] = Future.value(user) +} + +trait TopicSocialProofRecRequest { + tuc: TargetUser => + + val topicSocialProofRequest: TopicSocialProofRequest +} + +case class RecTargetFactory( + abDecider: LoggingABDecider, + userStore: ReadableStore[UserId, User], + paramBuilder: ParamsBuilder, + statsReceiver: StatsReceiver) { + + type RecTopicSocialProofTarget = DefaultRecTopicSocialProofTarget + + def buildRecTopicSocialProofTarget( + request: TopicSocialProofRequest + ): Future[RecTopicSocialProofTarget] = { + val userId = request.userId + userStore.get(userId).map { userOpt => + val userRoles = userOpt.flatMap(_.roles.map(_.roles.toSet)) + + val context = request.context.copy(userId = Some(request.userId)) // override to make sure + + val params = paramBuilder + .buildFromTopicListingViewerContext(Some(context), request.displayLocation, userRoles) + + DefaultRecTopicSocialProofTarget( + request, + userId, + userOpt, + ABDeciderWithOverride(abDecider, None)(statsReceiver), + params + )(statsReceiver) + } + } +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/common/TopicSocialProofDecider.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/common/TopicSocialProofDecider.scala new file mode 100644 index 000000000..39a4acb89 --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/common/TopicSocialProofDecider.scala @@ -0,0 +1,26 @@ +package com.twitter.tsp +package common + +import com.twitter.decider.Decider +import com.twitter.decider.RandomRecipient +import com.twitter.decider.Recipient +import com.twitter.simclusters_v2.common.DeciderGateBuilderWithIdHashing +import javax.inject.Inject + +case class TopicSocialProofDecider @Inject() (decider: Decider) { + + def isAvailable(feature: String, recipient: Option[Recipient]): Boolean = { + decider.isAvailable(feature, recipient) + } + + lazy val deciderGateBuilder = new DeciderGateBuilderWithIdHashing(decider) + + /** + * When useRandomRecipient is set to false, the decider is either completely on or off. + * When useRandomRecipient is set to true, the decider is on for the specified % of traffic. + */ + def isAvailable(feature: String, useRandomRecipient: Boolean = true): Boolean = { + if (useRandomRecipient) isAvailable(feature, Some(RandomRecipient)) + else isAvailable(feature, None) + } +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/common/TopicSocialProofParams.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/common/TopicSocialProofParams.scala new file mode 100644 index 000000000..4effe1313 --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/common/TopicSocialProofParams.scala @@ -0,0 +1,104 @@ +package com.twitter.tsp.common + +import com.twitter.finagle.stats.NullStatsReceiver +import com.twitter.logging.Logger +import com.twitter.timelines.configapi.BaseConfig +import com.twitter.timelines.configapi.BaseConfigBuilder +import com.twitter.timelines.configapi.FSBoundedParam +import com.twitter.timelines.configapi.FSParam +import com.twitter.timelines.configapi.FeatureSwitchOverrideUtil + +object TopicSocialProofParams { + + object TopicTweetsSemanticCoreVersionId + extends FSBoundedParam[Long]( + name = "topic_tweets_semantic_core_annotation_version_id", + default = 1433487161551032320L, + min = 0L, + max = Long.MaxValue + ) + object TopicTweetsSemanticCoreVersionIdsSet + extends FSParam[Set[Long]]( + name = "topic_tweets_semantic_core_annotation_version_id_allowed_set", + default = Set(TopicTweetsSemanticCoreVersionId.default)) + + /** + * Controls the Topic Social Proof cosine similarity threshold for the Topic Tweets. + */ + object TweetToTopicCosineSimilarityThreshold + extends FSBoundedParam[Double]( + name = "topic_tweets_cosine_similarity_threshold_tsp", + default = 0.0, + min = 0.0, + max = 1.0 + ) + + object EnablePersonalizedContextTopics // master feature switch to enable backfill + extends FSParam[Boolean]( + name = "topic_tweets_personalized_contexts_enable_personalized_contexts", + default = false + ) + + object EnableYouMightLikeTopic + extends FSParam[Boolean]( + name = "topic_tweets_personalized_contexts_enable_you_might_like", + default = false + ) + + object EnableRecentEngagementsTopic + extends FSParam[Boolean]( + name = "topic_tweets_personalized_contexts_enable_recent_engagements", + default = false + ) + + object EnableTopicTweetHealthFilterPersonalizedContexts + extends FSParam[Boolean]( + name = "topic_tweets_personalized_contexts_health_switch", + default = true + ) + + object EnableTweetToTopicScoreRanking + extends FSParam[Boolean]( + name = "topic_tweets_enable_tweet_to_topic_score_ranking", + default = true + ) + +} + +object FeatureSwitchConfig { + private val enumFeatureSwitchOverrides = FeatureSwitchOverrideUtil + .getEnumFSOverrides( + NullStatsReceiver, + Logger(getClass), + ) + + private val intFeatureSwitchOverrides = FeatureSwitchOverrideUtil.getBoundedIntFSOverrides() + + private val longFeatureSwitchOverrides = FeatureSwitchOverrideUtil.getBoundedLongFSOverrides( + TopicSocialProofParams.TopicTweetsSemanticCoreVersionId + ) + + private val doubleFeatureSwitchOverrides = FeatureSwitchOverrideUtil.getBoundedDoubleFSOverrides( + TopicSocialProofParams.TweetToTopicCosineSimilarityThreshold, + ) + + private val longSetFeatureSwitchOverrides = FeatureSwitchOverrideUtil.getLongSetFSOverrides( + TopicSocialProofParams.TopicTweetsSemanticCoreVersionIdsSet, + ) + + private val booleanFeatureSwitchOverrides = FeatureSwitchOverrideUtil.getBooleanFSOverrides( + TopicSocialProofParams.EnablePersonalizedContextTopics, + TopicSocialProofParams.EnableYouMightLikeTopic, + TopicSocialProofParams.EnableRecentEngagementsTopic, + TopicSocialProofParams.EnableTopicTweetHealthFilterPersonalizedContexts, + TopicSocialProofParams.EnableTweetToTopicScoreRanking, + ) + val config: BaseConfig = BaseConfigBuilder() + .set(enumFeatureSwitchOverrides: _*) + .set(intFeatureSwitchOverrides: _*) + .set(longFeatureSwitchOverrides: _*) + .set(doubleFeatureSwitchOverrides: _*) + .set(longSetFeatureSwitchOverrides: _*) + .set(booleanFeatureSwitchOverrides: _*) + .build() +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/handlers/BUILD b/topic-social-proof/server/src/main/scala/com/twitter/tsp/handlers/BUILD new file mode 100644 index 000000000..dc280e03d --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/handlers/BUILD @@ -0,0 +1,14 @@ +scala_library( + compiler_option_sets = ["fatal_warnings"], + tags = [ + "bazel-compatible", + ], + dependencies = [ + "src/thrift/com/twitter/simclusters_v2:simclusters_v2-thrift-scala", + "stitch/stitch-storehaus", + "topic-social-proof/server/src/main/scala/com/twitter/tsp/common", + "topic-social-proof/server/src/main/scala/com/twitter/tsp/stores", + "topic-social-proof/server/src/main/thrift:thrift-scala", + "topiclisting/topiclisting-core/src/main/scala/com/twitter/topiclisting", + ], +) diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/handlers/TopicSocialProofHandler.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/handlers/TopicSocialProofHandler.scala new file mode 100644 index 000000000..848ec1d72 --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/handlers/TopicSocialProofHandler.scala @@ -0,0 +1,587 @@ +package com.twitter.tsp.handlers + +import com.twitter.conversions.DurationOps._ +import com.twitter.finagle.mux.ClientDiscardedRequestException +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.frigate.common.util.StatsUtil +import com.twitter.simclusters_v2.common.SemanticCoreEntityId +import com.twitter.simclusters_v2.common.TweetId +import com.twitter.simclusters_v2.thriftscala.EmbeddingType +import com.twitter.simclusters_v2.thriftscala.ModelVersion +import com.twitter.strato.response.Err +import com.twitter.storehaus.ReadableStore +import com.twitter.timelines.configapi.Params +import com.twitter.topic_recos.common.Configs.ConsumerTopicEmbeddingType +import com.twitter.topic_recos.common.Configs.DefaultModelVersion +import com.twitter.topic_recos.common.Configs.ProducerTopicEmbeddingType +import com.twitter.topic_recos.common.Configs.TweetEmbeddingType +import com.twitter.topiclisting.TopicListingViewerContext +import com.twitter.topic_recos.common.LocaleUtil +import com.twitter.topiclisting.AnnotationRuleProvider +import com.twitter.tsp.common.DeciderConstants +import com.twitter.tsp.common.LoadShedder +import com.twitter.tsp.common.RecTargetFactory +import com.twitter.tsp.common.TopicSocialProofDecider +import com.twitter.tsp.common.TopicSocialProofParams +import com.twitter.tsp.stores.TopicSocialProofStore +import com.twitter.tsp.stores.TopicSocialProofStore.TopicSocialProof +import com.twitter.tsp.stores.UttTopicFilterStore +import com.twitter.tsp.stores.TopicTweetsCosineSimilarityAggregateStore.ScoreKey +import com.twitter.tsp.thriftscala.MetricTag +import com.twitter.tsp.thriftscala.TopicFollowType +import com.twitter.tsp.thriftscala.TopicListingSetting +import com.twitter.tsp.thriftscala.TopicSocialProofRequest +import com.twitter.tsp.thriftscala.TopicSocialProofResponse +import com.twitter.tsp.thriftscala.TopicWithScore +import com.twitter.tsp.thriftscala.TspTweetInfo +import com.twitter.tsp.utils.HealthSignalsUtils +import com.twitter.util.Future +import com.twitter.util.Timer +import com.twitter.util.Duration +import com.twitter.util.TimeoutException + +import scala.util.Random + +class TopicSocialProofHandler( + topicSocialProofStore: ReadableStore[TopicSocialProofStore.Query, Seq[TopicSocialProof]], + tweetInfoStore: ReadableStore[TweetId, TspTweetInfo], + uttTopicFilterStore: UttTopicFilterStore, + recTargetFactory: RecTargetFactory, + decider: TopicSocialProofDecider, + statsReceiver: StatsReceiver, + loadShedder: LoadShedder, + timer: Timer) { + + import TopicSocialProofHandler._ + + def getTopicSocialProofResponse( + request: TopicSocialProofRequest + ): Future[TopicSocialProofResponse] = { + val scopedStats = statsReceiver.scope(request.displayLocation.toString) + scopedStats.counter("fanoutRequests").incr(request.tweetIds.size) + scopedStats.stat("numTweetsPerRequest").add(request.tweetIds.size) + StatsUtil.trackBlockStats(scopedStats) { + recTargetFactory + .buildRecTopicSocialProofTarget(request).flatMap { target => + val enableCosineSimilarityScoreCalculation = + decider.isAvailable(DeciderConstants.enableTopicSocialProofScore) + + val semanticCoreVersionId = + target.params(TopicSocialProofParams.TopicTweetsSemanticCoreVersionId) + + val semanticCoreVersionIdsSet = + target.params(TopicSocialProofParams.TopicTweetsSemanticCoreVersionIdsSet) + + val allowListWithTopicFollowTypeFut = uttTopicFilterStore + .getAllowListTopicsForUser( + request.userId, + request.topicListingSetting, + TopicListingViewerContext + .fromThrift(request.context).copy(languageCode = + LocaleUtil.getStandardLanguageCode(request.context.languageCode)), + request.bypassModes.map(_.toSet) + ).rescue { + case _ => + scopedStats.counter("uttTopicFilterStoreFailure").incr() + Future.value(Map.empty[SemanticCoreEntityId, Option[TopicFollowType]]) + } + + val tweetInfoMapFut: Future[Map[TweetId, Option[TspTweetInfo]]] = Future + .collect( + tweetInfoStore.multiGet(request.tweetIds.toSet) + ).raiseWithin(TweetInfoStoreTimeout)(timer).rescue { + case _: TimeoutException => + scopedStats.counter("tweetInfoStoreTimeout").incr() + Future.value(Map.empty[TweetId, Option[TspTweetInfo]]) + case _ => + scopedStats.counter("tweetInfoStoreFailure").incr() + Future.value(Map.empty[TweetId, Option[TspTweetInfo]]) + } + + val definedTweetInfoMapFut = + keepTweetsWithTweetInfoAndLanguage(tweetInfoMapFut, request.displayLocation.toString) + + Future + .join(definedTweetInfoMapFut, allowListWithTopicFollowTypeFut).map { + case (tweetInfoMap, allowListWithTopicFollowType) => + val tweetIdsToQuery = tweetInfoMap.keys.toSet + val topicProofQueries = + tweetIdsToQuery.map { tweetId => + TopicSocialProofStore.Query( + TopicSocialProofStore.CacheableQuery( + tweetId = tweetId, + tweetLanguage = LocaleUtil.getSupportedStandardLanguageCodeWithDefault( + tweetInfoMap.getOrElse(tweetId, None).flatMap { + _.language + }), + enableCosineSimilarityScoreCalculation = + enableCosineSimilarityScoreCalculation + ), + allowedSemanticCoreVersionIds = semanticCoreVersionIdsSet + ) + } + + val topicSocialProofsFut: Future[Map[TweetId, Seq[TopicSocialProof]]] = { + Future + .collect(topicSocialProofStore.multiGet(topicProofQueries)).map(_.map { + case (query, results) => + query.cacheableQuery.tweetId -> results.toSeq.flatten.filter( + _.semanticCoreVersionId == semanticCoreVersionId) + }) + }.raiseWithin(TopicSocialProofStoreTimeout)(timer).rescue { + case _: TimeoutException => + scopedStats.counter("topicSocialProofStoreTimeout").incr() + Future(Map.empty[TweetId, Seq[TopicSocialProof]]) + case _ => + scopedStats.counter("topicSocialProofStoreFailure").incr() + Future(Map.empty[TweetId, Seq[TopicSocialProof]]) + } + + val random = new Random(seed = request.userId.toInt) + + topicSocialProofsFut.map { topicSocialProofs => + val filteredTopicSocialProofs = filterByAllowedList( + topicSocialProofs, + request.topicListingSetting, + allowListWithTopicFollowType.keySet + ) + + val filteredTopicSocialProofsEmptyCount: Int = + filteredTopicSocialProofs.count { + case (_, topicSocialProofs: Seq[TopicSocialProof]) => + topicSocialProofs.isEmpty + } + + scopedStats + .counter("filteredTopicSocialProofsCount").incr(filteredTopicSocialProofs.size) + scopedStats + .counter("filteredTopicSocialProofsEmptyCount").incr( + filteredTopicSocialProofsEmptyCount) + + if (isCrTopicTweets(request)) { + val socialProofs = filteredTopicSocialProofs.mapValues(_.flatMap { topicProof => + val topicWithScores = buildTopicWithRandomScore( + topicProof, + allowListWithTopicFollowType, + random + ) + topicWithScores + }) + TopicSocialProofResponse(socialProofs) + } else { + val socialProofs = filteredTopicSocialProofs.mapValues(_.flatMap { topicProof => + getTopicProofScore( + topicProof = topicProof, + allowListWithTopicFollowType = allowListWithTopicFollowType, + params = target.params, + random = random, + statsReceiver = statsReceiver + ) + + }.sortBy(-_.score).take(MaxCandidates)) + + val personalizedContextSocialProofs = + if (target.params(TopicSocialProofParams.EnablePersonalizedContextTopics)) { + val personalizedContextEligibility = + checkPersonalizedContextsEligibility( + target.params, + allowListWithTopicFollowType) + val filteredTweets = + filterPersonalizedContexts(socialProofs, tweetInfoMap, target.params) + backfillPersonalizedContexts( + allowListWithTopicFollowType, + filteredTweets, + request.tags.getOrElse(Map.empty), + personalizedContextEligibility) + } else { + Map.empty[TweetId, Seq[TopicWithScore]] + } + + val mergedSocialProofs = socialProofs.map { + case (tweetId, proofs) => + ( + tweetId, + proofs + ++ personalizedContextSocialProofs.getOrElse(tweetId, Seq.empty)) + } + + // Note that we will NOT filter out tweets with no TSP in either case + TopicSocialProofResponse(mergedSocialProofs) + } + } + } + }.flatten.raiseWithin(Timeout)(timer).rescue { + case _: ClientDiscardedRequestException => + scopedStats.counter("ClientDiscardedRequestException").incr() + Future.value(DefaultResponse) + case err: Err if err.code == Err.Cancelled => + scopedStats.counter("CancelledErr").incr() + Future.value(DefaultResponse) + case _ => + scopedStats.counter("FailedRequests").incr() + Future.value(DefaultResponse) + } + } + } + + /** + * Fetch the Score for each Topic Social Proof + */ + private def getTopicProofScore( + topicProof: TopicSocialProof, + allowListWithTopicFollowType: Map[SemanticCoreEntityId, Option[TopicFollowType]], + params: Params, + random: Random, + statsReceiver: StatsReceiver + ): Option[TopicWithScore] = { + val scopedStats = statsReceiver.scope("getTopicProofScores") + val enableTweetToTopicScoreRanking = + params(TopicSocialProofParams.EnableTweetToTopicScoreRanking) + + val minTweetToTopicCosineSimilarityThreshold = + params(TopicSocialProofParams.TweetToTopicCosineSimilarityThreshold) + + val topicWithScore = + if (enableTweetToTopicScoreRanking) { + scopedStats.counter("enableTweetToTopicScoreRanking").incr() + buildTopicWithValidScore( + topicProof, + TweetEmbeddingType, + Some(ConsumerTopicEmbeddingType), + Some(ProducerTopicEmbeddingType), + allowListWithTopicFollowType, + DefaultModelVersion, + minTweetToTopicCosineSimilarityThreshold + ) + } else { + scopedStats.counter("buildTopicWithRandomScore").incr() + buildTopicWithRandomScore( + topicProof, + allowListWithTopicFollowType, + random + ) + } + topicWithScore + + } + + private[handlers] def isCrTopicTweets( + request: TopicSocialProofRequest + ): Boolean = { + // CrTopic (across a variety of DisplayLocations) is the only use case with TopicListingSetting.All + request.topicListingSetting == TopicListingSetting.All + } + + /** + * Consolidate logics relevant to whether only quality topics should be enabled for Implicit Follows + */ + + /*** + * Consolidate logics relevant to whether Personalized Contexts backfilling should be enabled + */ + private[handlers] def checkPersonalizedContextsEligibility( + params: Params, + allowListWithTopicFollowType: Map[SemanticCoreEntityId, Option[TopicFollowType]] + ): PersonalizedContextEligibility = { + val scopedStats = statsReceiver.scope("checkPersonalizedContextsEligibility") + val isRecentFavInAllowlist = allowListWithTopicFollowType + .contains(AnnotationRuleProvider.recentFavTopicId) + + val isRecentFavEligible = + isRecentFavInAllowlist && params(TopicSocialProofParams.EnableRecentEngagementsTopic) + if (isRecentFavEligible) + scopedStats.counter("isRecentFavEligible").incr() + + val isRecentRetweetInAllowlist = allowListWithTopicFollowType + .contains(AnnotationRuleProvider.recentRetweetTopicId) + + val isRecentRetweetEligible = + isRecentRetweetInAllowlist && params(TopicSocialProofParams.EnableRecentEngagementsTopic) + if (isRecentRetweetEligible) + scopedStats.counter("isRecentRetweetEligible").incr() + + val isYMLInAllowlist = allowListWithTopicFollowType + .contains(AnnotationRuleProvider.youMightLikeTopicId) + + val isYMLEligible = + isYMLInAllowlist && params(TopicSocialProofParams.EnableYouMightLikeTopic) + if (isYMLEligible) + scopedStats.counter("isYMLEligible").incr() + + PersonalizedContextEligibility(isRecentFavEligible, isRecentRetweetEligible, isYMLEligible) + } + + private[handlers] def filterPersonalizedContexts( + socialProofs: Map[TweetId, Seq[TopicWithScore]], + tweetInfoMap: Map[TweetId, Option[TspTweetInfo]], + params: Params + ): Map[TweetId, Seq[TopicWithScore]] = { + val filters: Seq[(Option[TspTweetInfo], Params) => Boolean] = Seq( + healthSignalsFilter, + tweetLanguageFilter + ) + applyFilters(socialProofs, tweetInfoMap, params, filters) + } + + /** * + * filter tweets with None tweetInfo and undefined language + */ + private def keepTweetsWithTweetInfoAndLanguage( + tweetInfoMapFut: Future[Map[TweetId, Option[TspTweetInfo]]], + displayLocation: String + ): Future[Map[TweetId, Option[TspTweetInfo]]] = { + val scopedStats = statsReceiver.scope(displayLocation) + tweetInfoMapFut.map { tweetInfoMap => + val filteredTweetInfoMap = tweetInfoMap.filter { + case (_, optTweetInfo: Option[TspTweetInfo]) => + if (optTweetInfo.isEmpty) { + scopedStats.counter("undefinedTweetInfoCount").incr() + } + + optTweetInfo.exists { tweetInfo: TspTweetInfo => + { + if (tweetInfo.language.isEmpty) { + scopedStats.counter("undefinedLanguageCount").incr() + } + tweetInfo.language.isDefined + } + } + + } + val undefinedTweetInfoOrLangCount = tweetInfoMap.size - filteredTweetInfoMap.size + scopedStats.counter("undefinedTweetInfoOrLangCount").incr(undefinedTweetInfoOrLangCount) + + scopedStats.counter("TweetInfoCount").incr(tweetInfoMap.size) + + filteredTweetInfoMap + } + } + + /*** + * filter tweets with NO evergreen topic social proofs by their health signal scores & tweet languages + * i.e., tweets that are possible to be converted into Personalized Context topic tweets + * TBD: whether we are going to apply filters to all topic tweet candidates + */ + private def applyFilters( + socialProofs: Map[TweetId, Seq[TopicWithScore]], + tweetInfoMap: Map[TweetId, Option[TspTweetInfo]], + params: Params, + filters: Seq[(Option[TspTweetInfo], Params) => Boolean] + ): Map[TweetId, Seq[TopicWithScore]] = { + socialProofs.collect { + case (tweetId, socialProofs) if socialProofs.nonEmpty || filters.forall { filter => + filter(tweetInfoMap.getOrElse(tweetId, None), params) + } => + tweetId -> socialProofs + } + } + + private def healthSignalsFilter( + tweetInfoOpt: Option[TspTweetInfo], + params: Params + ): Boolean = { + !params( + TopicSocialProofParams.EnableTopicTweetHealthFilterPersonalizedContexts) || HealthSignalsUtils + .isHealthyTweet(tweetInfoOpt) + } + + private def tweetLanguageFilter( + tweetInfoOpt: Option[TspTweetInfo], + params: Params + ): Boolean = { + PersonalizedContextTopicsAllowedLanguageSet + .contains(tweetInfoOpt.flatMap(_.language).getOrElse(LocaleUtil.DefaultLanguage)) + } + + private[handlers] def backfillPersonalizedContexts( + allowListWithTopicFollowType: Map[SemanticCoreEntityId, Option[TopicFollowType]], + socialProofs: Map[TweetId, Seq[TopicWithScore]], + metricTagsMap: scala.collection.Map[TweetId, scala.collection.Set[MetricTag]], + personalizedContextEligibility: PersonalizedContextEligibility + ): Map[TweetId, Seq[TopicWithScore]] = { + val scopedStats = statsReceiver.scope("backfillPersonalizedContexts") + socialProofs.map { + case (tweetId, topicWithScores) => + if (topicWithScores.nonEmpty) { + tweetId -> Seq.empty + } else { + val metricTagContainsTweetFav = metricTagsMap + .getOrElse(tweetId, Set.empty[MetricTag]).contains(MetricTag.TweetFavorite) + val backfillRecentFav = + personalizedContextEligibility.isRecentFavEligible && metricTagContainsTweetFav + if (metricTagContainsTweetFav) + scopedStats.counter("MetricTag.TweetFavorite").incr() + if (backfillRecentFav) + scopedStats.counter("backfillRecentFav").incr() + + val metricTagContainsRetweet = metricTagsMap + .getOrElse(tweetId, Set.empty[MetricTag]).contains(MetricTag.Retweet) + val backfillRecentRetweet = + personalizedContextEligibility.isRecentRetweetEligible && metricTagContainsRetweet + if (metricTagContainsRetweet) + scopedStats.counter("MetricTag.Retweet").incr() + if (backfillRecentRetweet) + scopedStats.counter("backfillRecentRetweet").incr() + + val metricTagContainsRecentSearches = metricTagsMap + .getOrElse(tweetId, Set.empty[MetricTag]).contains( + MetricTag.InterestsRankerRecentSearches) + + val backfillYML = personalizedContextEligibility.isYMLEligible + if (backfillYML) + scopedStats.counter("backfillYML").incr() + + tweetId -> buildBackfillTopics( + allowListWithTopicFollowType, + backfillRecentFav, + backfillRecentRetweet, + backfillYML) + } + } + } + + private def buildBackfillTopics( + allowListWithTopicFollowType: Map[SemanticCoreEntityId, Option[TopicFollowType]], + backfillRecentFav: Boolean, + backfillRecentRetweet: Boolean, + backfillYML: Boolean + ): Seq[TopicWithScore] = { + Seq( + if (backfillRecentFav) { + Some( + TopicWithScore( + topicId = AnnotationRuleProvider.recentFavTopicId, + score = 1.0, + topicFollowType = allowListWithTopicFollowType + .getOrElse(AnnotationRuleProvider.recentFavTopicId, None) + )) + } else { None }, + if (backfillRecentRetweet) { + Some( + TopicWithScore( + topicId = AnnotationRuleProvider.recentRetweetTopicId, + score = 1.0, + topicFollowType = allowListWithTopicFollowType + .getOrElse(AnnotationRuleProvider.recentRetweetTopicId, None) + )) + } else { None }, + if (backfillYML) { + Some( + TopicWithScore( + topicId = AnnotationRuleProvider.youMightLikeTopicId, + score = 1.0, + topicFollowType = allowListWithTopicFollowType + .getOrElse(AnnotationRuleProvider.youMightLikeTopicId, None) + )) + } else { None } + ).flatten + } + + def toReadableStore: ReadableStore[TopicSocialProofRequest, TopicSocialProofResponse] = { + new ReadableStore[TopicSocialProofRequest, TopicSocialProofResponse] { + override def get(k: TopicSocialProofRequest): Future[Option[TopicSocialProofResponse]] = { + val displayLocation = k.displayLocation.toString + loadShedder(displayLocation) { + getTopicSocialProofResponse(k).map(Some(_)) + }.rescue { + case LoadShedder.LoadSheddingException => + statsReceiver.scope(displayLocation).counter("LoadSheddingException").incr() + Future.None + case _ => + statsReceiver.scope(displayLocation).counter("Exception").incr() + Future.None + } + } + } + } +} + +object TopicSocialProofHandler { + + private val MaxCandidates = 10 + // Currently we do hardcode for the language check of PersonalizedContexts Topics + private val PersonalizedContextTopicsAllowedLanguageSet: Set[String] = + Set("pt", "ko", "es", "ja", "tr", "id", "en", "hi", "ar", "fr", "ru") + + private val Timeout: Duration = 200.milliseconds + private val TopicSocialProofStoreTimeout: Duration = 40.milliseconds + private val TweetInfoStoreTimeout: Duration = 60.milliseconds + private val DefaultResponse: TopicSocialProofResponse = TopicSocialProofResponse(Map.empty) + + case class PersonalizedContextEligibility( + isRecentFavEligible: Boolean, + isRecentRetweetEligible: Boolean, + isYMLEligible: Boolean) + + /** + * Calculate the Topic Scores for each (tweet, topic), filter out topic proofs whose scores do not + * pass the minimum threshold + */ + private[handlers] def buildTopicWithValidScore( + topicProof: TopicSocialProof, + tweetEmbeddingType: EmbeddingType, + maybeConsumerEmbeddingType: Option[EmbeddingType], + maybeProducerEmbeddingType: Option[EmbeddingType], + allowListWithTopicFollowType: Map[SemanticCoreEntityId, Option[TopicFollowType]], + simClustersModelVersion: ModelVersion, + minTweetToTopicCosineSimilarityThreshold: Double + ): Option[TopicWithScore] = { + + val consumerScore = maybeConsumerEmbeddingType + .flatMap { consumerEmbeddingType => + topicProof.scores.get( + ScoreKey(consumerEmbeddingType, tweetEmbeddingType, simClustersModelVersion)) + }.getOrElse(0.0) + + val producerScore = maybeProducerEmbeddingType + .flatMap { producerEmbeddingType => + topicProof.scores.get( + ScoreKey(producerEmbeddingType, tweetEmbeddingType, simClustersModelVersion)) + }.getOrElse(0.0) + + val combinedScore = consumerScore + producerScore + if (combinedScore > minTweetToTopicCosineSimilarityThreshold || topicProof.ignoreSimClusterFiltering) { + Some( + TopicWithScore( + topicId = topicProof.topicId.entityId, + score = combinedScore, + topicFollowType = + allowListWithTopicFollowType.getOrElse(topicProof.topicId.entityId, None))) + } else { + None + } + } + + private[handlers] def buildTopicWithRandomScore( + topicSocialProof: TopicSocialProof, + allowListWithTopicFollowType: Map[SemanticCoreEntityId, Option[TopicFollowType]], + random: Random + ): Option[TopicWithScore] = { + + Some( + TopicWithScore( + topicId = topicSocialProof.topicId.entityId, + score = random.nextDouble(), + topicFollowType = + allowListWithTopicFollowType.getOrElse(topicSocialProof.topicId.entityId, None) + )) + } + + /** + * Filter all the non-qualified Topic Social Proof + */ + private[handlers] def filterByAllowedList( + topicProofs: Map[TweetId, Seq[TopicSocialProof]], + setting: TopicListingSetting, + allowList: Set[SemanticCoreEntityId] + ): Map[TweetId, Seq[TopicSocialProof]] = { + setting match { + case TopicListingSetting.All => + // Return all the topics + topicProofs + case _ => + topicProofs.mapValues( + _.filter(topicProof => allowList.contains(topicProof.topicId.entityId))) + } + } +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/handlers/UttChildrenWarmupHandler.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/handlers/UttChildrenWarmupHandler.scala new file mode 100644 index 000000000..b431685c8 --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/handlers/UttChildrenWarmupHandler.scala @@ -0,0 +1,40 @@ +package com.twitter.tsp.handlers + +import com.twitter.inject.utils.Handler +import com.twitter.topiclisting.FollowableTopicProductId +import com.twitter.topiclisting.ProductId +import com.twitter.topiclisting.TopicListingViewerContext +import com.twitter.topiclisting.utt.UttLocalization +import com.twitter.util.logging.Logging +import javax.inject.Inject +import javax.inject.Singleton + +/** * + * We configure Warmer to help warm up the cache hit rate under `CachedUttClient/get_utt_taxonomy/cache_hit_rate` + * In uttLocalization.getRecommendableTopics, we fetch all topics exist in UTT, and yet the process + * is in fact fetching the complete UTT tree struct (by calling getUttChildren recursively), which could take 1 sec + * Once we have the topics, we stored them in in-memory cache, and the cache hit rate is > 99% + * + */ +@Singleton +class UttChildrenWarmupHandler @Inject() (uttLocalization: UttLocalization) + extends Handler + with Logging { + + /** Executes the function of this handler. * */ + override def handle(): Unit = { + uttLocalization + .getRecommendableTopics( + productId = ProductId.Followable, + viewerContext = TopicListingViewerContext(languageCode = Some("en")), + enableInternationalTopics = true, + followableTopicProductId = FollowableTopicProductId.AllFollowable + ) + .onSuccess { result => + logger.info(s"successfully warmed up UttChildren. TopicId length = ${result.size}") + } + .onFailure { throwable => + logger.info(s"failed to warm up UttChildren. Throwable = ${throwable}") + } + } +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/BUILD b/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/BUILD new file mode 100644 index 000000000..d68c9ad23 --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/BUILD @@ -0,0 +1,30 @@ +scala_library( + compiler_option_sets = ["fatal_warnings"], + tags = [ + "bazel-compatible", + ], + dependencies = [ + "3rdparty/jvm/com/twitter/bijection:scrooge", + "3rdparty/jvm/com/twitter/storehaus:memcache", + "escherbird/src/scala/com/twitter/escherbird/util/uttclient", + "escherbird/src/thrift/com/twitter/escherbird/utt:strato-columns-scala", + "finagle-internal/mtls/src/main/scala/com/twitter/finagle/mtls/authentication", + "finatra-internal/mtls-thriftmux/src/main/scala", + "finatra/inject/inject-core/src/main/scala", + "finatra/inject/inject-thrift-client", + "frigate/frigate-common/src/main/scala/com/twitter/frigate/common/store/strato", + "hermit/hermit-core/src/main/scala/com/twitter/hermit/store/common", + "src/scala/com/twitter/storehaus_internal/memcache", + "src/scala/com/twitter/storehaus_internal/util", + "src/thrift/com/twitter/gizmoduck:thrift-scala", + "src/thrift/com/twitter/gizmoduck:user-thrift-scala", + "stitch/stitch-storehaus", + "stitch/stitch-tweetypie/src/main/scala", + "topic-social-proof/server/src/main/scala/com/twitter/tsp/common", + "topic-social-proof/server/src/main/scala/com/twitter/tsp/stores", + "topic-social-proof/server/src/main/scala/com/twitter/tsp/utils", + "topic-social-proof/server/src/main/thrift:thrift-scala", + "topiclisting/common/src/main/scala/com/twitter/topiclisting/clients", + "topiclisting/topiclisting-utt/src/main/scala/com/twitter/topiclisting/utt", + ], +) diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/GizmoduckUserModule.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/GizmoduckUserModule.scala new file mode 100644 index 000000000..a700d9fef --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/GizmoduckUserModule.scala @@ -0,0 +1,35 @@ +package com.twitter.tsp.modules + +import com.google.inject.Module +import com.twitter.finagle.ThriftMux +import com.twitter.finagle.mtls.authentication.ServiceIdentifier +import com.twitter.finagle.mtls.client.MtlsStackClient._ +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.finagle.thrift.ClientId +import com.twitter.finatra.mtls.thriftmux.modules.MtlsClient +import com.twitter.gizmoduck.thriftscala.UserService +import com.twitter.inject.Injector +import com.twitter.inject.thrift.modules.ThriftMethodBuilderClientModule + +object GizmoduckUserModule + extends ThriftMethodBuilderClientModule[ + UserService.ServicePerEndpoint, + UserService.MethodPerEndpoint + ] + with MtlsClient { + + override val label: String = "gizmoduck" + override val dest: String = "/s/gizmoduck/gizmoduck" + override val modules: Seq[Module] = Seq(TSPClientIdModule) + + override def configureThriftMuxClient( + injector: Injector, + client: ThriftMux.Client + ): ThriftMux.Client = { + super + .configureThriftMuxClient(injector, client) + .withMutualTls(injector.instance[ServiceIdentifier]) + .withClientId(injector.instance[ClientId]) + .withStatsReceiver(injector.instance[StatsReceiver].scope("giz")) + } +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/RepresentationScorerStoreModule.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/RepresentationScorerStoreModule.scala new file mode 100644 index 000000000..329276d8d --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/RepresentationScorerStoreModule.scala @@ -0,0 +1,47 @@ +package com.twitter.tsp.modules + +import com.google.inject.Module +import com.google.inject.Provides +import com.google.inject.Singleton +import com.twitter.app.Flag +import com.twitter.bijection.scrooge.BinaryScalaCodec +import com.twitter.conversions.DurationOps._ +import com.twitter.finagle.memcached.{Client => MemClient} +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.hermit.store.common.ObservedMemcachedReadableStore +import com.twitter.inject.TwitterModule +import com.twitter.simclusters_v2.thriftscala.Score +import com.twitter.simclusters_v2.thriftscala.ScoreId +import com.twitter.storehaus.ReadableStore +import com.twitter.strato.client.{Client => StratoClient} +import com.twitter.tsp.stores.RepresentationScorerStore + +object RepresentationScorerStoreModule extends TwitterModule { + override def modules: Seq[Module] = Seq(UnifiedCacheClient) + + private val tspRepresentationScoringColumnPath: Flag[String] = flag[String]( + name = "tsp.representationScoringColumnPath", + default = "recommendations/representation_scorer/score", + help = "Strato column path for Representation Scorer Store" + ) + + @Provides + @Singleton + def providesRepresentationScorerStore( + statsReceiver: StatsReceiver, + stratoClient: StratoClient, + tspUnifiedCacheClient: MemClient + ): ReadableStore[ScoreId, Score] = { + val underlyingStore = + RepresentationScorerStore(stratoClient, tspRepresentationScoringColumnPath(), statsReceiver) + ObservedMemcachedReadableStore.fromCacheClient( + backingStore = underlyingStore, + cacheClient = tspUnifiedCacheClient, + ttl = 2.hours + )( + valueInjection = BinaryScalaCodec(Score), + statsReceiver = statsReceiver.scope("RepresentationScorerStore"), + keyToString = { k: ScoreId => s"rsx/$k" } + ) + } +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/TSPClientIdModule.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/TSPClientIdModule.scala new file mode 100644 index 000000000..d22ef500f --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/TSPClientIdModule.scala @@ -0,0 +1,14 @@ +package com.twitter.tsp.modules + +import com.google.inject.Provides +import com.twitter.finagle.thrift.ClientId +import com.twitter.inject.TwitterModule +import javax.inject.Singleton + +object TSPClientIdModule extends TwitterModule { + private val clientIdFlag = flag("thrift.clientId", "topic-social-proof.prod", "Thrift client id") + + @Provides + @Singleton + def providesClientId: ClientId = ClientId(clientIdFlag()) +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/TopicListingModule.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/TopicListingModule.scala new file mode 100644 index 000000000..3f2768278 --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/TopicListingModule.scala @@ -0,0 +1,17 @@ +package com.twitter.tsp.modules + +import com.google.inject.Provides +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.inject.TwitterModule +import com.twitter.topiclisting.TopicListing +import com.twitter.topiclisting.TopicListingBuilder +import javax.inject.Singleton + +object TopicListingModule extends TwitterModule { + + @Provides + @Singleton + def providesTopicListing(statsReceiver: StatsReceiver): TopicListing = { + new TopicListingBuilder(statsReceiver.scope(namespace = "TopicListingBuilder")).build + } +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/TopicSocialProofStoreModule.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/TopicSocialProofStoreModule.scala new file mode 100644 index 000000000..fe63b0e21 --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/TopicSocialProofStoreModule.scala @@ -0,0 +1,68 @@ +package com.twitter.tsp.modules + +import com.google.inject.Module +import com.google.inject.Provides +import com.google.inject.Singleton +import com.twitter.conversions.DurationOps._ +import com.twitter.finagle.memcached.{Client => MemClient} +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.hermit.store.common.ObservedCachedReadableStore +import com.twitter.hermit.store.common.ObservedMemcachedReadableStore +import com.twitter.hermit.store.common.ObservedReadableStore +import com.twitter.inject.TwitterModule +import com.twitter.simclusters_v2.common.TweetId +import com.twitter.simclusters_v2.thriftscala.Score +import com.twitter.simclusters_v2.thriftscala.ScoreId +import com.twitter.storehaus.ReadableStore +import com.twitter.strato.client.{Client => StratoClient} +import com.twitter.tsp.stores.SemanticCoreAnnotationStore +import com.twitter.tsp.stores.TopicSocialProofStore +import com.twitter.tsp.stores.TopicSocialProofStore.TopicSocialProof +import com.twitter.tsp.utils.LZ4Injection +import com.twitter.tsp.utils.SeqObjectInjection + +object TopicSocialProofStoreModule extends TwitterModule { + override def modules: Seq[Module] = Seq(UnifiedCacheClient) + + @Provides + @Singleton + def providesTopicSocialProofStore( + representationScorerStore: ReadableStore[ScoreId, Score], + statsReceiver: StatsReceiver, + stratoClient: StratoClient, + tspUnifiedCacheClient: MemClient, + ): ReadableStore[TopicSocialProofStore.Query, Seq[TopicSocialProof]] = { + val semanticCoreAnnotationStore: ReadableStore[TweetId, Seq[ + SemanticCoreAnnotationStore.TopicAnnotation + ]] = ObservedReadableStore( + SemanticCoreAnnotationStore(SemanticCoreAnnotationStore.getStratoStore(stratoClient)) + )(statsReceiver.scope("SemanticCoreAnnotationStore")) + + val underlyingStore = TopicSocialProofStore( + representationScorerStore, + semanticCoreAnnotationStore + )(statsReceiver.scope("TopicSocialProofStore")) + + val memcachedStore = ObservedMemcachedReadableStore.fromCacheClient( + backingStore = underlyingStore, + cacheClient = tspUnifiedCacheClient, + ttl = 15.minutes, + asyncUpdate = true + )( + valueInjection = LZ4Injection.compose(SeqObjectInjection[TopicSocialProof]()), + statsReceiver = statsReceiver.scope("memCachedTopicSocialProofStore"), + keyToString = { k: TopicSocialProofStore.Query => s"tsps/${k.cacheableQuery}" } + ) + + val inMemoryCachedStore = + ObservedCachedReadableStore.from[TopicSocialProofStore.Query, Seq[TopicSocialProof]]( + memcachedStore, + ttl = 10.minutes, + maxKeys = 16777215, // ~ avg 160B, < 3000MB + cacheName = "topic_social_proof_cache", + windowSize = 10000L + )(statsReceiver.scope("InMemoryCachedTopicSocialProofStore")) + + inMemoryCachedStore + } +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/TopicTweetCosineSimilarityAggregateStoreModule.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/TopicTweetCosineSimilarityAggregateStoreModule.scala new file mode 100644 index 000000000..ac15b3746 --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/TopicTweetCosineSimilarityAggregateStoreModule.scala @@ -0,0 +1,26 @@ +package com.twitter.tsp.modules + +import com.google.inject.Provides +import com.google.inject.Singleton +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.inject.TwitterModule +import com.twitter.simclusters_v2.common.TweetId +import com.twitter.simclusters_v2.thriftscala.Score +import com.twitter.simclusters_v2.thriftscala.ScoreId +import com.twitter.simclusters_v2.thriftscala.TopicId +import com.twitter.storehaus.ReadableStore +import com.twitter.tsp.stores.TopicTweetsCosineSimilarityAggregateStore +import com.twitter.tsp.stores.TopicTweetsCosineSimilarityAggregateStore.ScoreKey + +object TopicTweetCosineSimilarityAggregateStoreModule extends TwitterModule { + + @Provides + @Singleton + def providesTopicTweetCosineSimilarityAggregateStore( + representationScorerStore: ReadableStore[ScoreId, Score], + statsReceiver: StatsReceiver, + ): ReadableStore[(TopicId, TweetId, Seq[ScoreKey]), Map[ScoreKey, Double]] = { + TopicTweetsCosineSimilarityAggregateStore(representationScorerStore)( + statsReceiver.scope("topicTweetsCosineSimilarityAggregateStore")) + } +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/TweetInfoStoreModule.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/TweetInfoStoreModule.scala new file mode 100644 index 000000000..1e08a9209 --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/TweetInfoStoreModule.scala @@ -0,0 +1,130 @@ +package com.twitter.tsp.modules + +import com.google.inject.Module +import com.google.inject.Provides +import com.google.inject.Singleton +import com.twitter.bijection.scrooge.BinaryScalaCodec +import com.twitter.conversions.DurationOps._ +import com.twitter.finagle.memcached.{Client => MemClient} +import com.twitter.finagle.mtls.authentication.ServiceIdentifier +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.frigate.common.store.health.TweetHealthModelStore +import com.twitter.frigate.common.store.health.TweetHealthModelStore.TweetHealthModelStoreConfig +import com.twitter.frigate.common.store.health.UserHealthModelStore +import com.twitter.frigate.common.store.interests.UserId +import com.twitter.frigate.thriftscala.TweetHealthScores +import com.twitter.frigate.thriftscala.UserAgathaScores +import com.twitter.hermit.store.common.DeciderableReadableStore +import com.twitter.hermit.store.common.ObservedCachedReadableStore +import com.twitter.hermit.store.common.ObservedMemcachedReadableStore +import com.twitter.inject.TwitterModule +import com.twitter.simclusters_v2.common.TweetId +import com.twitter.stitch.tweetypie.TweetyPie +import com.twitter.storehaus.ReadableStore +import com.twitter.strato.client.{Client => StratoClient} +import com.twitter.tsp.common.DeciderKey +import com.twitter.tsp.common.TopicSocialProofDecider +import com.twitter.tsp.stores.TweetInfoStore +import com.twitter.tsp.stores.TweetyPieFieldsStore +import com.twitter.tweetypie.thriftscala.TweetService +import com.twitter.tsp.thriftscala.TspTweetInfo +import com.twitter.util.JavaTimer +import com.twitter.util.Timer + +object TweetInfoStoreModule extends TwitterModule { + override def modules: Seq[Module] = Seq(UnifiedCacheClient) + implicit val timer: Timer = new JavaTimer(true) + + @Provides + @Singleton + def providesTweetInfoStore( + decider: TopicSocialProofDecider, + serviceIdentifier: ServiceIdentifier, + statsReceiver: StatsReceiver, + stratoClient: StratoClient, + tspUnifiedCacheClient: MemClient, + tweetyPieService: TweetService.MethodPerEndpoint + ): ReadableStore[TweetId, TspTweetInfo] = { + val tweetHealthModelStore: ReadableStore[TweetId, TweetHealthScores] = { + val underlyingStore = TweetHealthModelStore.buildReadableStore( + stratoClient, + Some( + TweetHealthModelStoreConfig( + enablePBlock = true, + enableToxicity = true, + enablePSpammy = true, + enablePReported = true, + enableSpammyTweetContent = true, + enablePNegMultimodal = false)) + )(statsReceiver.scope("UnderlyingTweetHealthModelStore")) + + DeciderableReadableStore( + ObservedMemcachedReadableStore.fromCacheClient( + backingStore = underlyingStore, + cacheClient = tspUnifiedCacheClient, + ttl = 2.hours + )( + valueInjection = BinaryScalaCodec(TweetHealthScores), + statsReceiver = statsReceiver.scope("TweetHealthModelStore"), + keyToString = { k: TweetId => s"tHMS/$k" } + ), + decider.deciderGateBuilder.idGate(DeciderKey.enableHealthSignalsScoreDeciderKey), + statsReceiver.scope("TweetHealthModelStore") + ) + } + + val userHealthModelStore: ReadableStore[UserId, UserAgathaScores] = { + val underlyingStore = + UserHealthModelStore.buildReadableStore(stratoClient)( + statsReceiver.scope("UnderlyingUserHealthModelStore")) + + DeciderableReadableStore( + ObservedMemcachedReadableStore.fromCacheClient( + backingStore = underlyingStore, + cacheClient = tspUnifiedCacheClient, + ttl = 18.hours + )( + valueInjection = BinaryScalaCodec(UserAgathaScores), + statsReceiver = statsReceiver.scope("UserHealthModelStore"), + keyToString = { k: UserId => s"uHMS/$k" } + ), + decider.deciderGateBuilder.idGate(DeciderKey.enableUserAgathaScoreDeciderKey), + statsReceiver.scope("UserHealthModelStore") + ) + } + + val tweetInfoStore: ReadableStore[TweetId, TspTweetInfo] = { + val underlyingStore = TweetInfoStore( + TweetyPieFieldsStore.getStoreFromTweetyPie(TweetyPie(tweetyPieService, statsReceiver)), + tweetHealthModelStore: ReadableStore[TweetId, TweetHealthScores], + userHealthModelStore: ReadableStore[UserId, UserAgathaScores], + timer: Timer + )(statsReceiver.scope("tweetInfoStore")) + + val memcachedStore = ObservedMemcachedReadableStore.fromCacheClient( + backingStore = underlyingStore, + cacheClient = tspUnifiedCacheClient, + ttl = 15.minutes, + // Hydrating tweetInfo is now a required step for all candidates, + // hence we needed to tune these thresholds. + asyncUpdate = serviceIdentifier.environment == "prod" + )( + valueInjection = BinaryScalaCodec(TspTweetInfo), + statsReceiver = statsReceiver.scope("memCachedTweetInfoStore"), + keyToString = { k: TweetId => s"tIS/$k" } + ) + + val inMemoryStore = ObservedCachedReadableStore.from( + memcachedStore, + ttl = 15.minutes, + maxKeys = 8388607, // Check TweetInfo definition. size~92b. Around 736 MB + windowSize = 10000L, + cacheName = "tweet_info_cache", + maxMultiGetSize = 20 + )(statsReceiver.scope("inMemoryCachedTweetInfoStore")) + + inMemoryStore + } + tweetInfoStore + } +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/TweetyPieClientModule.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/TweetyPieClientModule.scala new file mode 100644 index 000000000..98d515dda --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/TweetyPieClientModule.scala @@ -0,0 +1,63 @@ +package com.twitter.tsp +package modules + +import com.google.inject.Module +import com.google.inject.Provides +import com.twitter.conversions.DurationOps.richDurationFromInt +import com.twitter.finagle.ThriftMux +import com.twitter.finagle.mtls.authentication.ServiceIdentifier +import com.twitter.finagle.mtls.client.MtlsStackClient.MtlsThriftMuxClientSyntax +import com.twitter.finagle.mux.ClientDiscardedRequestException +import com.twitter.finagle.service.ReqRep +import com.twitter.finagle.service.ResponseClass +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.finagle.thrift.ClientId +import com.twitter.inject.Injector +import com.twitter.inject.thrift.modules.ThriftMethodBuilderClientModule +import com.twitter.tweetypie.thriftscala.TweetService +import com.twitter.util.Duration +import com.twitter.util.Throw +import com.twitter.stitch.tweetypie.{TweetyPie => STweetyPie} +import com.twitter.finatra.mtls.thriftmux.modules.MtlsClient +import javax.inject.Singleton + +object TweetyPieClientModule + extends ThriftMethodBuilderClientModule[ + TweetService.ServicePerEndpoint, + TweetService.MethodPerEndpoint + ] + with MtlsClient { + override val label = "tweetypie" + override val dest = "/s/tweetypie/tweetypie" + override val requestTimeout: Duration = 450.milliseconds + + override val modules: Seq[Module] = Seq(TSPClientIdModule) + + // We bump the success rate from the default of 0.8 to 0.9 since we're dropping the + // consecutive failures part of the default policy. + override def configureThriftMuxClient( + injector: Injector, + client: ThriftMux.Client + ): ThriftMux.Client = + super + .configureThriftMuxClient(injector, client) + .withMutualTls(injector.instance[ServiceIdentifier]) + .withStatsReceiver(injector.instance[StatsReceiver].scope("clnt")) + .withClientId(injector.instance[ClientId]) + .withResponseClassifier { + case ReqRep(_, Throw(_: ClientDiscardedRequestException)) => ResponseClass.Ignorable + } + .withSessionQualifier + .successRateFailureAccrual(successRate = 0.9, window = 30.seconds) + .withResponseClassifier { + case ReqRep(_, Throw(_: ClientDiscardedRequestException)) => ResponseClass.Ignorable + } + + @Provides + @Singleton + def providesTweetyPie( + tweetyPieService: TweetService.MethodPerEndpoint + ): STweetyPie = { + STweetyPie(tweetyPieService) + } +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/UnifiedCacheClient.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/UnifiedCacheClient.scala new file mode 100644 index 000000000..8fe65fc73 --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/UnifiedCacheClient.scala @@ -0,0 +1,33 @@ +package com.twitter.tsp.modules + +import com.google.inject.Provides +import com.google.inject.Singleton +import com.twitter.app.Flag +import com.twitter.finagle.memcached.Client +import com.twitter.finagle.mtls.authentication.ServiceIdentifier +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.inject.TwitterModule +import com.twitter.storehaus_internal.memcache.MemcacheStore +import com.twitter.storehaus_internal.util.ClientName +import com.twitter.storehaus_internal.util.ZkEndPoint + +object UnifiedCacheClient extends TwitterModule { + val tspUnifiedCacheDest: Flag[String] = flag[String]( + name = "tsp.unifiedCacheDest", + default = "/srv#/prod/local/cache/topic_social_proof_unified", + help = "Wily path to topic social proof unified cache" + ) + + @Provides + @Singleton + def provideUnifiedCacheClient( + serviceIdentifier: ServiceIdentifier, + statsReceiver: StatsReceiver, + ): Client = + MemcacheStore.memcachedClient( + name = ClientName("topic-social-proof-unified-memcache"), + dest = ZkEndPoint(tspUnifiedCacheDest()), + statsReceiver = statsReceiver.scope("cache_client"), + serviceIdentifier = serviceIdentifier + ) +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/UttClientModule.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/UttClientModule.scala new file mode 100644 index 000000000..ae0099b8b --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/UttClientModule.scala @@ -0,0 +1,41 @@ +package com.twitter.tsp.modules + +import com.google.inject.Provides +import com.twitter.escherbird.util.uttclient.CacheConfigV2 +import com.twitter.escherbird.util.uttclient.CachedUttClientV2 +import com.twitter.escherbird.util.uttclient.UttClientCacheConfigsV2 +import com.twitter.escherbird.utt.strato.thriftscala.Environment +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.inject.TwitterModule +import com.twitter.strato.client.Client +import com.twitter.topiclisting.clients.utt.UttClient +import javax.inject.Singleton + +object UttClientModule extends TwitterModule { + + @Provides + @Singleton + def providesUttClient( + stratoClient: Client, + statsReceiver: StatsReceiver + ): UttClient = { + + // Save 2 ^ 18 UTTs. Promising 100% cache rate + lazy val defaultCacheConfigV2: CacheConfigV2 = CacheConfigV2(262143) + lazy val uttClientCacheConfigsV2: UttClientCacheConfigsV2 = UttClientCacheConfigsV2( + getTaxonomyConfig = defaultCacheConfigV2, + getUttTaxonomyConfig = defaultCacheConfigV2, + getLeafIds = defaultCacheConfigV2, + getLeafUttEntities = defaultCacheConfigV2 + ) + + // CachedUttClient to use StratoClient + lazy val cachedUttClientV2: CachedUttClientV2 = new CachedUttClientV2( + stratoClient = stratoClient, + env = Environment.Prod, + cacheConfigs = uttClientCacheConfigsV2, + statsReceiver = statsReceiver.scope("CachedUttClient") + ) + new UttClient(cachedUttClientV2, statsReceiver) + } +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/UttLocalizationModule.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/UttLocalizationModule.scala new file mode 100644 index 000000000..7d8844b98 --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/modules/UttLocalizationModule.scala @@ -0,0 +1,27 @@ +package com.twitter.tsp.modules + +import com.google.inject.Provides +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.inject.TwitterModule +import com.twitter.topiclisting.TopicListing +import com.twitter.topiclisting.clients.utt.UttClient +import com.twitter.topiclisting.utt.UttLocalization +import com.twitter.topiclisting.utt.UttLocalizationImpl +import javax.inject.Singleton + +object UttLocalizationModule extends TwitterModule { + + @Provides + @Singleton + def providesUttLocalization( + topicListing: TopicListing, + uttClient: UttClient, + statsReceiver: StatsReceiver + ): UttLocalization = { + new UttLocalizationImpl( + topicListing, + uttClient, + statsReceiver + ) + } +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/service/BUILD b/topic-social-proof/server/src/main/scala/com/twitter/tsp/service/BUILD new file mode 100644 index 000000000..372962922 --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/service/BUILD @@ -0,0 +1,23 @@ +scala_library( + compiler_option_sets = ["fatal_warnings"], + tags = [ + "bazel-compatible", + ], + dependencies = [ + "3rdparty/jvm/javax/inject:javax.inject", + "abdecider/src/main/scala", + "content-recommender/thrift/src/main/thrift:thrift-scala", + "hermit/hermit-core/src/main/scala/com/twitter/hermit/store/common", + "hermit/hermit-core/src/main/scala/com/twitter/hermit/store/gizmoduck", + "src/scala/com/twitter/topic_recos/stores", + "src/thrift/com/twitter/gizmoduck:thrift-scala", + "src/thrift/com/twitter/gizmoduck:user-thrift-scala", + "src/thrift/com/twitter/simclusters_v2:simclusters_v2-thrift-scala", + "stitch/stitch-storehaus", + "topic-social-proof/server/src/main/scala/com/twitter/tsp/common", + "topic-social-proof/server/src/main/scala/com/twitter/tsp/handlers", + "topic-social-proof/server/src/main/scala/com/twitter/tsp/modules", + "topic-social-proof/server/src/main/scala/com/twitter/tsp/stores", + "topic-social-proof/server/src/main/thrift:thrift-scala", + ], +) diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/service/TopicSocialProofService.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/service/TopicSocialProofService.scala new file mode 100644 index 000000000..f123e819f --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/service/TopicSocialProofService.scala @@ -0,0 +1,182 @@ +package com.twitter.tsp.service + +import com.twitter.abdecider.ABDeciderFactory +import com.twitter.abdecider.LoggingABDecider +import com.twitter.tsp.thriftscala.TspTweetInfo +import com.twitter.discovery.common.configapi.FeatureContextBuilder +import com.twitter.finagle.mtls.authentication.ServiceIdentifier +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.gizmoduck.thriftscala.LookupContext +import com.twitter.gizmoduck.thriftscala.QueryFields +import com.twitter.gizmoduck.thriftscala.User +import com.twitter.gizmoduck.thriftscala.UserService +import com.twitter.hermit.store.gizmoduck.GizmoduckUserStore +import com.twitter.logging.Logger +import com.twitter.simclusters_v2.common.SemanticCoreEntityId +import com.twitter.simclusters_v2.common.TweetId +import com.twitter.simclusters_v2.common.UserId +import com.twitter.spam.rtf.thriftscala.SafetyLevel +import com.twitter.stitch.storehaus.StitchOfReadableStore +import com.twitter.storehaus.ReadableStore +import com.twitter.strato.client.{Client => StratoClient} +import com.twitter.timelines.configapi +import com.twitter.timelines.configapi.CompositeConfig +import com.twitter.tsp.common.FeatureSwitchConfig +import com.twitter.tsp.common.FeatureSwitchesBuilder +import com.twitter.tsp.common.LoadShedder +import com.twitter.tsp.common.ParamsBuilder +import com.twitter.tsp.common.RecTargetFactory +import com.twitter.tsp.common.TopicSocialProofDecider +import com.twitter.tsp.handlers.TopicSocialProofHandler +import com.twitter.tsp.stores.LocalizedUttRecommendableTopicsStore +import com.twitter.tsp.stores.LocalizedUttTopicNameRequest +import com.twitter.tsp.stores.TopicResponses +import com.twitter.tsp.stores.TopicSocialProofStore +import com.twitter.tsp.stores.TopicSocialProofStore.TopicSocialProof +import com.twitter.tsp.stores.TopicStore +import com.twitter.tsp.stores.UttTopicFilterStore +import com.twitter.tsp.thriftscala.TopicSocialProofRequest +import com.twitter.tsp.thriftscala.TopicSocialProofResponse +import com.twitter.util.JavaTimer +import com.twitter.util.Timer +import javax.inject.Inject +import javax.inject.Singleton +import com.twitter.topiclisting.TopicListing +import com.twitter.topiclisting.utt.UttLocalization + +@Singleton +class TopicSocialProofService @Inject() ( + topicSocialProofStore: ReadableStore[TopicSocialProofStore.Query, Seq[TopicSocialProof]], + tweetInfoStore: ReadableStore[TweetId, TspTweetInfo], + serviceIdentifier: ServiceIdentifier, + stratoClient: StratoClient, + gizmoduck: UserService.MethodPerEndpoint, + topicListing: TopicListing, + uttLocalization: UttLocalization, + decider: TopicSocialProofDecider, + loadShedder: LoadShedder, + stats: StatsReceiver) { + + import TopicSocialProofService._ + + private val statsReceiver = stats.scope("topic-social-proof-management") + + private val isProd: Boolean = serviceIdentifier.environment == "prod" + + private val optOutStratoStorePath: String = + if (isProd) "interests/optOutInterests" else "interests/staging/optOutInterests" + + private val notInterestedInStorePath: String = + if (isProd) "interests/notInterestedTopicsGetter" + else "interests/staging/notInterestedTopicsGetter" + + private val userOptOutTopicsStore: ReadableStore[UserId, TopicResponses] = + TopicStore.userOptOutTopicStore(stratoClient, optOutStratoStorePath)( + statsReceiver.scope("ints_interests_opt_out_store")) + private val explicitFollowingTopicsStore: ReadableStore[UserId, TopicResponses] = + TopicStore.explicitFollowingTopicStore(stratoClient)( + statsReceiver.scope("ints_explicit_following_interests_store")) + private val userNotInterestedInTopicsStore: ReadableStore[UserId, TopicResponses] = + TopicStore.notInterestedInTopicsStore(stratoClient, notInterestedInStorePath)( + statsReceiver.scope("ints_not_interested_in_store")) + + private lazy val localizedUttRecommendableTopicsStore: ReadableStore[ + LocalizedUttTopicNameRequest, + Set[ + SemanticCoreEntityId + ] + ] = new LocalizedUttRecommendableTopicsStore(uttLocalization) + + implicit val timer: Timer = new JavaTimer(true) + + private lazy val uttTopicFilterStore = new UttTopicFilterStore( + topicListing = topicListing, + userOptOutTopicsStore = userOptOutTopicsStore, + explicitFollowingTopicsStore = explicitFollowingTopicsStore, + notInterestedTopicsStore = userNotInterestedInTopicsStore, + localizedUttRecommendableTopicsStore = localizedUttRecommendableTopicsStore, + timer = timer, + stats = statsReceiver.scope("UttTopicFilterStore") + ) + + private lazy val scribeLogger: Option[Logger] = Some(Logger.get("client_event")) + + private lazy val abDecider: LoggingABDecider = + ABDeciderFactory( + abDeciderYmlPath = configRepoDirectory + "/abdecider/abdecider.yml", + scribeLogger = scribeLogger, + decider = None, + environment = Some("production"), + ).buildWithLogging() + + private val builder: FeatureSwitchesBuilder = FeatureSwitchesBuilder( + statsReceiver = statsReceiver.scope("featureswitches-v2"), + abDecider = abDecider, + featuresDirectory = "features/topic-social-proof/main", + configRepoDirectory = configRepoDirectory, + addServiceDetailsFromAurora = !serviceIdentifier.isLocal, + fastRefresh = !isProd + ) + + private lazy val overridesConfig: configapi.Config = { + new CompositeConfig( + Seq( + FeatureSwitchConfig.config + ) + ) + } + + private val featureContextBuilder: FeatureContextBuilder = FeatureContextBuilder(builder.build()) + + private val paramsBuilder: ParamsBuilder = ParamsBuilder( + featureContextBuilder, + abDecider, + overridesConfig, + statsReceiver.scope("params") + ) + + private val userStore: ReadableStore[UserId, User] = { + val queryFields: Set[QueryFields] = Set( + QueryFields.Profile, + QueryFields.Account, + QueryFields.Roles, + QueryFields.Discoverability, + QueryFields.Safety, + QueryFields.Takedowns + ) + val context: LookupContext = LookupContext(safetyLevel = Some(SafetyLevel.Recommendations)) + + GizmoduckUserStore( + client = gizmoduck, + queryFields = queryFields, + context = context, + statsReceiver = statsReceiver.scope("gizmoduck") + ) + } + + private val recTargetFactory: RecTargetFactory = RecTargetFactory( + abDecider, + userStore, + paramsBuilder, + statsReceiver + ) + + private val topicSocialProofHandler = + new TopicSocialProofHandler( + topicSocialProofStore, + tweetInfoStore, + uttTopicFilterStore, + recTargetFactory, + decider, + statsReceiver.scope("TopicSocialProofHandler"), + loadShedder, + timer) + + val topicSocialProofHandlerStoreStitch: TopicSocialProofRequest => com.twitter.stitch.Stitch[ + TopicSocialProofResponse + ] = StitchOfReadableStore(topicSocialProofHandler.toReadableStore) +} + +object TopicSocialProofService { + private val configRepoDirectory = "/usr/local/config" +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/BUILD b/topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/BUILD new file mode 100644 index 000000000..a933b3782 --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/BUILD @@ -0,0 +1,32 @@ +scala_library( + compiler_option_sets = ["fatal_warnings"], + tags = [ + "bazel-compatible", + ], + dependencies = [ + "3rdparty/jvm/com/twitter/storehaus:core", + "content-recommender/thrift/src/main/thrift:thrift-scala", + "escherbird/src/thrift/com/twitter/escherbird/topicannotation:topicannotation-thrift-scala", + "frigate/frigate-common:util", + "frigate/frigate-common/src/main/scala/com/twitter/frigate/common/store/health", + "frigate/frigate-common/src/main/scala/com/twitter/frigate/common/store/interests", + "frigate/frigate-common/src/main/scala/com/twitter/frigate/common/store/strato", + "hermit/hermit-core/src/main/scala/com/twitter/hermit/store/common", + "mediaservices/commons/src/main/thrift:thrift-scala", + "src/scala/com/twitter/simclusters_v2/common", + "src/scala/com/twitter/simclusters_v2/score", + "src/scala/com/twitter/topic_recos/common", + "src/scala/com/twitter/topic_recos/stores", + "src/thrift/com/twitter/frigate:frigate-common-thrift-scala", + "src/thrift/com/twitter/simclusters_v2:simclusters_v2-thrift-scala", + "src/thrift/com/twitter/spam/rtf:safety-level-scala", + "src/thrift/com/twitter/tweetypie:service-scala", + "src/thrift/com/twitter/tweetypie:tweet-scala", + "stitch/stitch-storehaus", + "stitch/stitch-tweetypie/src/main/scala", + "strato/src/main/scala/com/twitter/strato/client", + "topic-social-proof/server/src/main/scala/com/twitter/tsp/utils", + "topic-social-proof/server/src/main/thrift:thrift-scala", + "topiclisting/topiclisting-core/src/main/scala/com/twitter/topiclisting", + ], +) diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/LocalizedUttRecommendableTopicsStore.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/LocalizedUttRecommendableTopicsStore.scala new file mode 100644 index 000000000..bcac9d5f6 --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/LocalizedUttRecommendableTopicsStore.scala @@ -0,0 +1,30 @@ +package com.twitter.tsp.stores + +import com.twitter.storehaus.ReadableStore +import com.twitter.topiclisting.FollowableTopicProductId +import com.twitter.topiclisting.ProductId +import com.twitter.topiclisting.SemanticCoreEntityId +import com.twitter.topiclisting.TopicListingViewerContext +import com.twitter.topiclisting.utt.UttLocalization +import com.twitter.util.Future + +case class LocalizedUttTopicNameRequest( + productId: ProductId.Value, + viewerContext: TopicListingViewerContext, + enableInternationalTopics: Boolean) + +class LocalizedUttRecommendableTopicsStore(uttLocalization: UttLocalization) + extends ReadableStore[LocalizedUttTopicNameRequest, Set[SemanticCoreEntityId]] { + + override def get( + request: LocalizedUttTopicNameRequest + ): Future[Option[Set[SemanticCoreEntityId]]] = { + uttLocalization + .getRecommendableTopics( + productId = request.productId, + viewerContext = request.viewerContext, + enableInternationalTopics = request.enableInternationalTopics, + followableTopicProductId = FollowableTopicProductId.AllFollowable + ).map { response => Some(response) } + } +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/RepresentationScorerStore.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/RepresentationScorerStore.scala new file mode 100644 index 000000000..7d5095ca6 --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/RepresentationScorerStore.scala @@ -0,0 +1,31 @@ +package com.twitter.tsp.stores + +import com.twitter.contentrecommender.thriftscala.ScoringResponse +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.frigate.common.store.strato.StratoFetchableStore +import com.twitter.hermit.store.common.ObservedReadableStore +import com.twitter.simclusters_v2.thriftscala.Score +import com.twitter.simclusters_v2.thriftscala.ScoreId +import com.twitter.storehaus.ReadableStore +import com.twitter.strato.client.Client +import com.twitter.strato.thrift.ScroogeConvImplicits._ +import com.twitter.tsp.utils.ReadableStoreWithMapOptionValues + +object RepresentationScorerStore { + + def apply( + stratoClient: Client, + scoringColumnPath: String, + stats: StatsReceiver + ): ReadableStore[ScoreId, Score] = { + val stratoFetchableStore = StratoFetchableStore + .withUnitView[ScoreId, ScoringResponse](stratoClient, scoringColumnPath) + + val enrichedStore = new ReadableStoreWithMapOptionValues[ScoreId, ScoringResponse, Score]( + stratoFetchableStore).mapOptionValues(_.score) + + ObservedReadableStore( + enrichedStore + )(stats.scope("representation_scorer_store")) + } +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/SemanticCoreAnnotationStore.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/SemanticCoreAnnotationStore.scala new file mode 100644 index 000000000..cfeb7722b --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/SemanticCoreAnnotationStore.scala @@ -0,0 +1,64 @@ +package com.twitter.tsp.stores + +import com.twitter.escherbird.topicannotation.strato.thriftscala.TopicAnnotationValue +import com.twitter.escherbird.topicannotation.strato.thriftscala.TopicAnnotationView +import com.twitter.frigate.common.store.strato.StratoFetchableStore +import com.twitter.simclusters_v2.common.TopicId +import com.twitter.simclusters_v2.common.TweetId +import com.twitter.storehaus.ReadableStore +import com.twitter.strato.client.Client +import com.twitter.strato.thrift.ScroogeConvImplicits._ +import com.twitter.util.Future + +/** + * This is copied from `src/scala/com/twitter/topic_recos/stores/SemanticCoreAnnotationStore.scala` + * Unfortunately their version assumes (incorrectly) that there is no View which causes warnings. + * While these warnings may not cause any problems in practice, better safe than sorry. + */ +object SemanticCoreAnnotationStore { + private val column = "semanticCore/topicannotation/topicAnnotation.Tweet" + + def getStratoStore(stratoClient: Client): ReadableStore[TweetId, TopicAnnotationValue] = { + StratoFetchableStore + .withView[TweetId, TopicAnnotationView, TopicAnnotationValue]( + stratoClient, + column, + TopicAnnotationView()) + } + + case class TopicAnnotation( + topicId: TopicId, + ignoreSimClustersFilter: Boolean, + modelVersionId: Long) +} + +/** + * Given a tweet Id, return the list of annotations defined by the TSIG team. + */ +case class SemanticCoreAnnotationStore(stratoStore: ReadableStore[TweetId, TopicAnnotationValue]) + extends ReadableStore[TweetId, Seq[SemanticCoreAnnotationStore.TopicAnnotation]] { + import SemanticCoreAnnotationStore._ + + override def multiGet[K1 <: TweetId]( + ks: Set[K1] + ): Map[K1, Future[Option[Seq[TopicAnnotation]]]] = { + stratoStore + .multiGet(ks) + .mapValues(_.map(_.map { topicAnnotationValue => + topicAnnotationValue.annotationsPerModel match { + case Some(annotationWithVersions) => + annotationWithVersions.flatMap { annotations => + annotations.annotations.map { annotation => + TopicAnnotation( + annotation.entityId, + annotation.ignoreQualityFilter.getOrElse(false), + annotations.modelVersionId + ) + } + } + case _ => + Nil + } + })) + } +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/TopicSocialProofStore.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/TopicSocialProofStore.scala new file mode 100644 index 000000000..6ed71ca14 --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/TopicSocialProofStore.scala @@ -0,0 +1,127 @@ +package com.twitter.tsp.stores + +import com.twitter.tsp.stores.TopicTweetsCosineSimilarityAggregateStore.ScoreKey +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.frigate.common.util.StatsUtil +import com.twitter.simclusters_v2.thriftscala._ +import com.twitter.storehaus.ReadableStore +import com.twitter.simclusters_v2.common.TweetId +import com.twitter.tsp.stores.SemanticCoreAnnotationStore._ +import com.twitter.tsp.stores.TopicSocialProofStore.TopicSocialProof +import com.twitter.util.Future + +/** + * Provides a session-less Topic Social Proof information which doesn't rely on any User Info. + * This store is used by MemCache and In-Memory cache to achieve a higher performance. + * One Consumer embedding and Producer embedding are used to calculate raw score. + */ +case class TopicSocialProofStore( + representationScorerStore: ReadableStore[ScoreId, Score], + semanticCoreAnnotationStore: ReadableStore[TweetId, Seq[TopicAnnotation]] +)( + statsReceiver: StatsReceiver) + extends ReadableStore[TopicSocialProofStore.Query, Seq[TopicSocialProof]] { + import TopicSocialProofStore._ + + // Fetches the tweet's topic annotations from SemanticCore's Annotation API + override def get(query: TopicSocialProofStore.Query): Future[Option[Seq[TopicSocialProof]]] = { + StatsUtil.trackOptionStats(statsReceiver) { + for { + annotations <- + StatsUtil.trackItemsStats(statsReceiver.scope("semanticCoreAnnotationStore")) { + semanticCoreAnnotationStore.get(query.cacheableQuery.tweetId).map(_.getOrElse(Nil)) + } + + filteredAnnotations = filterAnnotationsByAllowList(annotations, query) + + scoredTopics <- + StatsUtil.trackItemMapStats(statsReceiver.scope("scoreTopicTweetsTweetLanguage")) { + // de-dup identical topicIds + val uniqueTopicIds = filteredAnnotations.map { annotation => + TopicId(annotation.topicId, Some(query.cacheableQuery.tweetLanguage), country = None) + }.toSet + + if (query.cacheableQuery.enableCosineSimilarityScoreCalculation) { + scoreTopicTweets(query.cacheableQuery.tweetId, uniqueTopicIds) + } else { + Future.value(uniqueTopicIds.map(id => id -> Map.empty[ScoreKey, Double]).toMap) + } + } + + } yield { + if (scoredTopics.nonEmpty) { + val versionedTopicProofs = filteredAnnotations.map { annotation => + val topicId = + TopicId(annotation.topicId, Some(query.cacheableQuery.tweetLanguage), country = None) + + TopicSocialProof( + topicId, + scores = scoredTopics.getOrElse(topicId, Map.empty), + annotation.ignoreSimClustersFilter, + annotation.modelVersionId + ) + } + Some(versionedTopicProofs) + } else { + None + } + } + } + } + + /*** + * When the allowList is not empty (e.g., TSP handler call, CrTopic handler call), + * the filter will be enabled and we will only keep annotations that have versionIds existing + * in the input allowedSemanticCoreVersionIds set. + * But when the allowList is empty (e.g., some debugger calls), + * we will not filter anything and pass. + * We limit the number of versionIds to be K = MaxNumberVersionIds + */ + private def filterAnnotationsByAllowList( + annotations: Seq[TopicAnnotation], + query: TopicSocialProofStore.Query + ): Seq[TopicAnnotation] = { + + val trimmedVersionIds = query.allowedSemanticCoreVersionIds.take(MaxNumberVersionIds) + annotations.filter { annotation => + trimmedVersionIds.isEmpty || trimmedVersionIds.contains(annotation.modelVersionId) + } + } + + private def scoreTopicTweets( + tweetId: TweetId, + topicIds: Set[TopicId] + ): Future[Map[TopicId, Map[ScoreKey, Double]]] = { + Future.collect { + topicIds.map { topicId => + val scoresFut = TopicTweetsCosineSimilarityAggregateStore.getRawScoresMap( + topicId, + tweetId, + TopicTweetsCosineSimilarityAggregateStore.DefaultScoreKeys, + representationScorerStore + ) + topicId -> scoresFut + }.toMap + } + } +} + +object TopicSocialProofStore { + + private val MaxNumberVersionIds = 9 + + case class Query( + cacheableQuery: CacheableQuery, + allowedSemanticCoreVersionIds: Set[Long] = Set.empty) // overridden by FS + + case class CacheableQuery( + tweetId: TweetId, + tweetLanguage: String, + enableCosineSimilarityScoreCalculation: Boolean = true) + + case class TopicSocialProof( + topicId: TopicId, + scores: Map[ScoreKey, Double], + ignoreSimClusterFiltering: Boolean, + semanticCoreVersionId: Long) +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/TopicStore.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/TopicStore.scala new file mode 100644 index 000000000..61fae8c6a --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/TopicStore.scala @@ -0,0 +1,135 @@ +package com.twitter.tsp.stores + +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.frigate.common.store.InterestedInInterestsFetchKey +import com.twitter.frigate.common.store.strato.StratoFetchableStore +import com.twitter.hermit.store.common.ObservedReadableStore +import com.twitter.interests.thriftscala.InterestId +import com.twitter.interests.thriftscala.InterestLabel +import com.twitter.interests.thriftscala.InterestRelationship +import com.twitter.interests.thriftscala.InterestRelationshipV1 +import com.twitter.interests.thriftscala.InterestedInInterestLookupContext +import com.twitter.interests.thriftscala.InterestedInInterestModel +import com.twitter.interests.thriftscala.OptOutInterestLookupContext +import com.twitter.interests.thriftscala.UserInterest +import com.twitter.interests.thriftscala.UserInterestData +import com.twitter.interests.thriftscala.UserInterestsResponse +import com.twitter.simclusters_v2.common.UserId +import com.twitter.storehaus.ReadableStore +import com.twitter.strato.client.Client +import com.twitter.strato.thrift.ScroogeConvImplicits._ + +case class TopicResponse( + entityId: Long, + interestedInData: Seq[InterestedInInterestModel], + scoreOverride: Option[Double] = None, + notInterestedInTimestamp: Option[Long] = None, + topicFollowTimestamp: Option[Long] = None) + +case class TopicResponses(responses: Seq[TopicResponse]) + +object TopicStore { + + private val InterestedInInterestsColumn = "interests/interestedInInterests" + private lazy val ExplicitInterestsContext: InterestedInInterestLookupContext = + InterestedInInterestLookupContext( + explicitContext = None, + inferredContext = None, + disableImplicit = Some(true) + ) + + private def userInterestsResponseToTopicResponse( + userInterestsResponse: UserInterestsResponse + ): TopicResponses = { + val responses = userInterestsResponse.interests.interests.toSeq.flatMap { userInterests => + userInterests.collect { + case UserInterest( + InterestId.SemanticCore(semanticCoreEntity), + Some(UserInterestData.InterestedIn(data))) => + val topicFollowingTimestampOpt = data.collect { + case InterestedInInterestModel.ExplicitModel( + InterestRelationship.V1(interestRelationshipV1)) => + interestRelationshipV1.timestampMs + }.lastOption + + TopicResponse(semanticCoreEntity.id, data, None, None, topicFollowingTimestampOpt) + } + } + TopicResponses(responses) + } + + def explicitFollowingTopicStore( + stratoClient: Client + )( + implicit statsReceiver: StatsReceiver + ): ReadableStore[UserId, TopicResponses] = { + val stratoStore = + StratoFetchableStore + .withUnitView[InterestedInInterestsFetchKey, UserInterestsResponse]( + stratoClient, + InterestedInInterestsColumn) + .composeKeyMapping[UserId](uid => + InterestedInInterestsFetchKey( + userId = uid, + labels = None, + lookupContext = Some(ExplicitInterestsContext) + )) + .mapValues(userInterestsResponseToTopicResponse) + + ObservedReadableStore(stratoStore) + } + + def userOptOutTopicStore( + stratoClient: Client, + optOutStratoStorePath: String + )( + implicit statsReceiver: StatsReceiver + ): ReadableStore[UserId, TopicResponses] = { + val stratoStore = + StratoFetchableStore + .withUnitView[ + (Long, Option[Seq[InterestLabel]], Option[OptOutInterestLookupContext]), + UserInterestsResponse](stratoClient, optOutStratoStorePath) + .composeKeyMapping[UserId](uid => (uid, None, None)) + .mapValues { userInterestsResponse => + val responses = userInterestsResponse.interests.interests.toSeq.flatMap { userInterests => + userInterests.collect { + case UserInterest( + InterestId.SemanticCore(semanticCoreEntity), + Some(UserInterestData.InterestedIn(data))) => + TopicResponse(semanticCoreEntity.id, data, None) + } + } + TopicResponses(responses) + } + ObservedReadableStore(stratoStore) + } + + def notInterestedInTopicsStore( + stratoClient: Client, + notInterestedInStorePath: String + )( + implicit statsReceiver: StatsReceiver + ): ReadableStore[UserId, TopicResponses] = { + val stratoStore = + StratoFetchableStore + .withUnitView[Long, Seq[UserInterest]](stratoClient, notInterestedInStorePath) + .composeKeyMapping[UserId](identity) + .mapValues { notInterestedInInterests => + val responses = notInterestedInInterests.collect { + case UserInterest( + InterestId.SemanticCore(semanticCoreEntity), + Some(UserInterestData.NotInterested(notInterestedInData))) => + val notInterestedInTimestampOpt = notInterestedInData.collect { + case InterestRelationship.V1(interestRelationshipV1: InterestRelationshipV1) => + interestRelationshipV1.timestampMs + }.lastOption + + TopicResponse(semanticCoreEntity.id, Seq.empty, None, notInterestedInTimestampOpt) + } + TopicResponses(responses) + } + ObservedReadableStore(stratoStore) + } + +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/TopicTweetsCosineSimilarityAggregateStore.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/TopicTweetsCosineSimilarityAggregateStore.scala new file mode 100644 index 000000000..3fb65d8ac --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/TopicTweetsCosineSimilarityAggregateStore.scala @@ -0,0 +1,99 @@ +package com.twitter.tsp.stores + +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.simclusters_v2.common.TweetId +import com.twitter.simclusters_v2.thriftscala.EmbeddingType +import com.twitter.simclusters_v2.thriftscala.InternalId +import com.twitter.simclusters_v2.thriftscala.ModelVersion +import com.twitter.simclusters_v2.thriftscala.ScoreInternalId +import com.twitter.simclusters_v2.thriftscala.ScoringAlgorithm +import com.twitter.simclusters_v2.thriftscala.SimClustersEmbeddingId +import com.twitter.simclusters_v2.thriftscala.{ + SimClustersEmbeddingPairScoreId => ThriftSimClustersEmbeddingPairScoreId +} +import com.twitter.simclusters_v2.thriftscala.TopicId +import com.twitter.simclusters_v2.thriftscala.{Score => ThriftScore} +import com.twitter.simclusters_v2.thriftscala.{ScoreId => ThriftScoreId} +import com.twitter.storehaus.ReadableStore +import com.twitter.topic_recos.common._ +import com.twitter.topic_recos.common.Configs.DefaultModelVersion +import com.twitter.tsp.stores.TopicTweetsCosineSimilarityAggregateStore.ScoreKey +import com.twitter.util.Future + +object TopicTweetsCosineSimilarityAggregateStore { + + val TopicEmbeddingTypes: Seq[EmbeddingType] = + Seq( + EmbeddingType.FavTfgTopic, + EmbeddingType.LogFavBasedKgoApeTopic + ) + + // Add the new embedding types if want to test the new Tweet embedding performance. + val TweetEmbeddingTypes: Seq[EmbeddingType] = Seq(EmbeddingType.LogFavBasedTweet) + + val ModelVersions: Seq[ModelVersion] = + Seq(DefaultModelVersion) + + val DefaultScoreKeys: Seq[ScoreKey] = { + for { + modelVersion <- ModelVersions + topicEmbeddingType <- TopicEmbeddingTypes + tweetEmbeddingType <- TweetEmbeddingTypes + } yield { + ScoreKey( + topicEmbeddingType = topicEmbeddingType, + tweetEmbeddingType = tweetEmbeddingType, + modelVersion = modelVersion + ) + } + } + + case class ScoreKey( + topicEmbeddingType: EmbeddingType, + tweetEmbeddingType: EmbeddingType, + modelVersion: ModelVersion) + + def getRawScoresMap( + topicId: TopicId, + tweetId: TweetId, + scoreKeys: Seq[ScoreKey], + representationScorerStore: ReadableStore[ThriftScoreId, ThriftScore] + ): Future[Map[ScoreKey, Double]] = { + val scoresMapFut = scoreKeys.map { key => + val scoreInternalId = ScoreInternalId.SimClustersEmbeddingPairScoreId( + ThriftSimClustersEmbeddingPairScoreId( + buildTopicEmbedding(topicId, key.topicEmbeddingType, key.modelVersion), + SimClustersEmbeddingId( + key.tweetEmbeddingType, + key.modelVersion, + InternalId.TweetId(tweetId)) + )) + val scoreFut = representationScorerStore + .get( + ThriftScoreId( + algorithm = ScoringAlgorithm.PairEmbeddingCosineSimilarity, // Hard code as cosine sim + internalId = scoreInternalId + )) + key -> scoreFut + }.toMap + + Future + .collect(scoresMapFut).map(_.collect { + case (key, Some(ThriftScore(score))) => + (key, score) + }) + } +} + +case class TopicTweetsCosineSimilarityAggregateStore( + representationScorerStore: ReadableStore[ThriftScoreId, ThriftScore] +)( + statsReceiver: StatsReceiver) + extends ReadableStore[(TopicId, TweetId, Seq[ScoreKey]), Map[ScoreKey, Double]] { + import TopicTweetsCosineSimilarityAggregateStore._ + + override def get(k: (TopicId, TweetId, Seq[ScoreKey])): Future[Option[Map[ScoreKey, Double]]] = { + statsReceiver.counter("topicTweetsCosineSimilariltyAggregateStore").incr() + getRawScoresMap(k._1, k._2, k._3, representationScorerStore).map(Some(_)) + } +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/TweetInfoStore.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/TweetInfoStore.scala new file mode 100644 index 000000000..70cc00451 --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/TweetInfoStore.scala @@ -0,0 +1,230 @@ +package com.twitter.tsp.stores + +import com.twitter.conversions.DurationOps._ +import com.twitter.tsp.thriftscala.TspTweetInfo +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.frigate.thriftscala.TweetHealthScores +import com.twitter.frigate.thriftscala.UserAgathaScores +import com.twitter.logging.Logger +import com.twitter.mediaservices.commons.thriftscala.MediaCategory +import com.twitter.mediaservices.commons.tweetmedia.thriftscala.MediaInfo +import com.twitter.mediaservices.commons.tweetmedia.thriftscala.MediaSizeType +import com.twitter.simclusters_v2.common.TweetId +import com.twitter.simclusters_v2.common.UserId +import com.twitter.spam.rtf.thriftscala.SafetyLevel +import com.twitter.stitch.Stitch +import com.twitter.stitch.storehaus.ReadableStoreOfStitch +import com.twitter.stitch.tweetypie.TweetyPie +import com.twitter.stitch.tweetypie.TweetyPie.TweetyPieException +import com.twitter.storehaus.ReadableStore +import com.twitter.topiclisting.AnnotationRuleProvider +import com.twitter.tsp.utils.HealthSignalsUtils +import com.twitter.tweetypie.thriftscala.TweetInclude +import com.twitter.tweetypie.thriftscala.{Tweet => TTweet} +import com.twitter.tweetypie.thriftscala._ +import com.twitter.util.Duration +import com.twitter.util.Future +import com.twitter.util.TimeoutException +import com.twitter.util.Timer + +object TweetyPieFieldsStore { + + // Tweet fields options. Only fields specified here will be hydrated in the tweet + private val CoreTweetFields: Set[TweetInclude] = Set[TweetInclude]( + TweetInclude.TweetFieldId(TTweet.IdField.id), + TweetInclude.TweetFieldId(TTweet.CoreDataField.id), // needed for the authorId + TweetInclude.TweetFieldId(TTweet.LanguageField.id), + TweetInclude.CountsFieldId(StatusCounts.FavoriteCountField.id), + TweetInclude.CountsFieldId(StatusCounts.RetweetCountField.id), + TweetInclude.TweetFieldId(TTweet.QuotedTweetField.id), + TweetInclude.TweetFieldId(TTweet.MediaKeysField.id), + TweetInclude.TweetFieldId(TTweet.EscherbirdEntityAnnotationsField.id), + TweetInclude.TweetFieldId(TTweet.MediaField.id), + TweetInclude.TweetFieldId(TTweet.UrlsField.id) + ) + + private val gtfo: GetTweetFieldsOptions = GetTweetFieldsOptions( + tweetIncludes = CoreTweetFields, + safetyLevel = Some(SafetyLevel.Recommendations) + ) + + def getStoreFromTweetyPie( + tweetyPie: TweetyPie, + convertExceptionsToNotFound: Boolean = true + ): ReadableStore[Long, GetTweetFieldsResult] = { + val log = Logger("TweetyPieFieldsStore") + + ReadableStoreOfStitch { tweetId: Long => + tweetyPie + .getTweetFields(tweetId, options = gtfo) + .rescue { + case ex: TweetyPieException if convertExceptionsToNotFound => + log.error(ex, s"Error while hitting tweetypie ${ex.result}") + Stitch.NotFound + } + } + } +} + +object TweetInfoStore { + + case class IsPassTweetHealthFilters(tweetStrictest: Option[Boolean]) + + case class IsPassAgathaHealthFilters(agathaStrictest: Option[Boolean]) + + private val HealthStoreTimeout: Duration = 40.milliseconds + private val isPassTweetHealthFilters: IsPassTweetHealthFilters = IsPassTweetHealthFilters(None) + private val isPassAgathaHealthFilters: IsPassAgathaHealthFilters = IsPassAgathaHealthFilters(None) +} + +case class TweetInfoStore( + tweetFieldsStore: ReadableStore[TweetId, GetTweetFieldsResult], + tweetHealthModelStore: ReadableStore[TweetId, TweetHealthScores], + userHealthModelStore: ReadableStore[UserId, UserAgathaScores], + timer: Timer +)( + statsReceiver: StatsReceiver) + extends ReadableStore[TweetId, TspTweetInfo] { + + import TweetInfoStore._ + + private[this] def toTweetInfo( + tweetFieldsResult: GetTweetFieldsResult + ): Future[Option[TspTweetInfo]] = { + tweetFieldsResult.tweetResult match { + case result: TweetFieldsResultState.Found if result.found.suppressReason.isEmpty => + val tweet = result.found.tweet + + val authorIdOpt = tweet.coreData.map(_.userId) + val favCountOpt = tweet.counts.flatMap(_.favoriteCount) + + val languageOpt = tweet.language.map(_.language) + val hasImageOpt = + tweet.mediaKeys.map(_.map(_.mediaCategory).exists(_ == MediaCategory.TweetImage)) + val hasGifOpt = + tweet.mediaKeys.map(_.map(_.mediaCategory).exists(_ == MediaCategory.TweetGif)) + val isNsfwAuthorOpt = Some( + tweet.coreData.exists(_.nsfwUser) || tweet.coreData.exists(_.nsfwAdmin)) + val isTweetReplyOpt = tweet.coreData.map(_.reply.isDefined) + val hasMultipleMediaOpt = + tweet.mediaKeys.map(_.map(_.mediaCategory).size > 1) + + val isKGODenylist = Some( + tweet.escherbirdEntityAnnotations + .exists(_.entityAnnotations.exists(AnnotationRuleProvider.isSuppressedTopicsDenylist))) + + val isNullcastOpt = tweet.coreData.map(_.nullcast) // These are Ads. go/nullcast + + val videoDurationOpt = tweet.media.flatMap(_.flatMap { + _.mediaInfo match { + case Some(MediaInfo.VideoInfo(info)) => + Some((info.durationMillis + 999) / 1000) // video playtime always round up + case _ => None + } + }.headOption) + + // There many different types of videos. To be robust to new types being added, we just use + // the videoDurationOpt to keep track of whether the item has a video or not. + val hasVideo = videoDurationOpt.isDefined + + val mediaDimensionsOpt = + tweet.media.flatMap(_.headOption.flatMap( + _.sizes.find(_.sizeType == MediaSizeType.Orig).map(size => (size.width, size.height)))) + + val mediaWidth = mediaDimensionsOpt.map(_._1).getOrElse(1) + val mediaHeight = mediaDimensionsOpt.map(_._2).getOrElse(1) + // high resolution media's width is always greater than 480px and height is always greater than 480px + val isHighMediaResolution = mediaHeight > 480 && mediaWidth > 480 + val isVerticalAspectRatio = mediaHeight >= mediaWidth && mediaWidth > 1 + val hasUrlOpt = tweet.urls.map(_.nonEmpty) + + (authorIdOpt, favCountOpt) match { + case (Some(authorId), Some(favCount)) => + hydrateHealthScores(tweet.id, authorId).map { + case (isPassAgathaHealthFilters, isPassTweetHealthFilters) => + Some( + TspTweetInfo( + authorId = authorId, + favCount = favCount, + language = languageOpt, + hasImage = hasImageOpt, + hasVideo = Some(hasVideo), + hasGif = hasGifOpt, + isNsfwAuthor = isNsfwAuthorOpt, + isKGODenylist = isKGODenylist, + isNullcast = isNullcastOpt, + videoDurationSeconds = videoDurationOpt, + isHighMediaResolution = Some(isHighMediaResolution), + isVerticalAspectRatio = Some(isVerticalAspectRatio), + isPassAgathaHealthFilterStrictest = isPassAgathaHealthFilters.agathaStrictest, + isPassTweetHealthFilterStrictest = isPassTweetHealthFilters.tweetStrictest, + isReply = isTweetReplyOpt, + hasMultipleMedia = hasMultipleMediaOpt, + hasUrl = hasUrlOpt + )) + } + case _ => + statsReceiver.counter("missingFields").incr() + Future.None // These values should always exist. + } + case _: TweetFieldsResultState.NotFound => + statsReceiver.counter("notFound").incr() + Future.None + case _: TweetFieldsResultState.Failed => + statsReceiver.counter("failed").incr() + Future.None + case _: TweetFieldsResultState.Filtered => + statsReceiver.counter("filtered").incr() + Future.None + case _ => + statsReceiver.counter("unknown").incr() + Future.None + } + } + + private[this] def hydrateHealthScores( + tweetId: TweetId, + authorId: Long + ): Future[(IsPassAgathaHealthFilters, IsPassTweetHealthFilters)] = { + Future + .join( + tweetHealthModelStore + .multiGet(Set(tweetId))(tweetId), + userHealthModelStore + .multiGet(Set(authorId))(authorId) + ).map { + case (tweetHealthScoresOpt, userAgathaScoresOpt) => + // This stats help us understand empty rate for AgathaCalibratedNsfw / NsfwTextUserScore + statsReceiver.counter("totalCountAgathaScore").incr() + if (userAgathaScoresOpt.getOrElse(UserAgathaScores()).agathaCalibratedNsfw.isEmpty) + statsReceiver.counter("emptyCountAgathaCalibratedNsfw").incr() + if (userAgathaScoresOpt.getOrElse(UserAgathaScores()).nsfwTextUserScore.isEmpty) + statsReceiver.counter("emptyCountNsfwTextUserScore").incr() + + val isPassAgathaHealthFilters = IsPassAgathaHealthFilters( + agathaStrictest = + Some(HealthSignalsUtils.isTweetAgathaModelQualified(userAgathaScoresOpt)), + ) + + val isPassTweetHealthFilters = IsPassTweetHealthFilters( + tweetStrictest = + Some(HealthSignalsUtils.isTweetHealthModelQualified(tweetHealthScoresOpt)) + ) + + (isPassAgathaHealthFilters, isPassTweetHealthFilters) + }.raiseWithin(HealthStoreTimeout)(timer).rescue { + case _: TimeoutException => + statsReceiver.counter("hydrateHealthScoreTimeout").incr() + Future.value((isPassAgathaHealthFilters, isPassTweetHealthFilters)) + case _ => + statsReceiver.counter("hydrateHealthScoreFailure").incr() + Future.value((isPassAgathaHealthFilters, isPassTweetHealthFilters)) + } + } + + override def multiGet[K1 <: TweetId](ks: Set[K1]): Map[K1, Future[Option[TspTweetInfo]]] = { + statsReceiver.counter("tweetFieldsStore").incr(ks.size) + tweetFieldsStore + .multiGet(ks).mapValues(_.flatMap { _.map { v => toTweetInfo(v) }.getOrElse(Future.None) }) + } +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/UttTopicFilterStore.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/UttTopicFilterStore.scala new file mode 100644 index 000000000..89a502008 --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/stores/UttTopicFilterStore.scala @@ -0,0 +1,248 @@ +package com.twitter.tsp.stores + +import com.twitter.conversions.DurationOps._ +import com.twitter.finagle.FailureFlags.flagsOf +import com.twitter.finagle.mux.ClientDiscardedRequestException +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.frigate.common.store.interests +import com.twitter.simclusters_v2.common.UserId +import com.twitter.storehaus.ReadableStore +import com.twitter.topiclisting.ProductId +import com.twitter.topiclisting.TopicListing +import com.twitter.topiclisting.TopicListingViewerContext +import com.twitter.topiclisting.{SemanticCoreEntityId => ScEntityId} +import com.twitter.tsp.thriftscala.TopicFollowType +import com.twitter.tsp.thriftscala.TopicListingSetting +import com.twitter.tsp.thriftscala.TopicSocialProofFilteringBypassMode +import com.twitter.util.Duration +import com.twitter.util.Future +import com.twitter.util.TimeoutException +import com.twitter.util.Timer + +class UttTopicFilterStore( + topicListing: TopicListing, + userOptOutTopicsStore: ReadableStore[interests.UserId, TopicResponses], + explicitFollowingTopicsStore: ReadableStore[interests.UserId, TopicResponses], + notInterestedTopicsStore: ReadableStore[interests.UserId, TopicResponses], + localizedUttRecommendableTopicsStore: ReadableStore[LocalizedUttTopicNameRequest, Set[Long]], + timer: Timer, + stats: StatsReceiver) { + import UttTopicFilterStore._ + + // Set of blacklisted SemanticCore IDs that are paused. + private[this] def getPausedTopics(topicCtx: TopicListingViewerContext): Set[ScEntityId] = { + topicListing.getPausedTopics(topicCtx) + } + + private[this] def getOptOutTopics(userId: Long): Future[Set[ScEntityId]] = { + stats.counter("getOptOutTopicsCount").incr() + userOptOutTopicsStore + .get(userId).map { responseOpt => + responseOpt + .map { responses => responses.responses.map(_.entityId) }.getOrElse(Seq.empty).toSet + }.raiseWithin(DefaultOptOutTimeout)(timer).rescue { + case err: TimeoutException => + stats.counter("getOptOutTopicsTimeout").incr() + Future.exception(err) + case err: ClientDiscardedRequestException + if flagsOf(err).contains("interrupted") && flagsOf(err) + .contains("ignorable") => + stats.counter("getOptOutTopicsDiscardedBackupRequest").incr() + Future.exception(err) + case err => + stats.counter("getOptOutTopicsFailure").incr() + Future.exception(err) + } + } + + private[this] def getNotInterestedIn(userId: Long): Future[Set[ScEntityId]] = { + stats.counter("getNotInterestedInCount").incr() + notInterestedTopicsStore + .get(userId).map { responseOpt => + responseOpt + .map { responses => responses.responses.map(_.entityId) }.getOrElse(Seq.empty).toSet + }.raiseWithin(DefaultNotInterestedInTimeout)(timer).rescue { + case err: TimeoutException => + stats.counter("getNotInterestedInTimeout").incr() + Future.exception(err) + case err: ClientDiscardedRequestException + if flagsOf(err).contains("interrupted") && flagsOf(err) + .contains("ignorable") => + stats.counter("getNotInterestedInDiscardedBackupRequest").incr() + Future.exception(err) + case err => + stats.counter("getNotInterestedInFailure").incr() + Future.exception(err) + } + } + + private[this] def getFollowedTopics(userId: Long): Future[Set[TopicResponse]] = { + stats.counter("getFollowedTopicsCount").incr() + + explicitFollowingTopicsStore + .get(userId).map { responseOpt => + responseOpt.map(_.responses.toSet).getOrElse(Set.empty) + }.raiseWithin(DefaultInterestedInTimeout)(timer).rescue { + case _: TimeoutException => + stats.counter("getFollowedTopicsTimeout").incr() + Future(Set.empty) + case _ => + stats.counter("getFollowedTopicsFailure").incr() + Future(Set.empty) + } + } + + private[this] def getFollowedTopicIds(userId: Long): Future[Set[ScEntityId]] = { + getFollowedTopics(userId: Long).map(_.map(_.entityId)) + } + + private[this] def getWhitelistTopicIds( + normalizedContext: TopicListingViewerContext, + enableInternationalTopics: Boolean + ): Future[Set[ScEntityId]] = { + stats.counter("getWhitelistTopicIdsCount").incr() + + val uttRequest = LocalizedUttTopicNameRequest( + productId = ProductId.Followable, + viewerContext = normalizedContext, + enableInternationalTopics = enableInternationalTopics + ) + localizedUttRecommendableTopicsStore + .get(uttRequest).map { response => + response.getOrElse(Set.empty) + }.rescue { + case _ => + stats.counter("getWhitelistTopicIdsFailure").incr() + Future(Set.empty) + } + } + + private[this] def getDenyListTopicIdsForUser( + userId: UserId, + topicListingSetting: TopicListingSetting, + context: TopicListingViewerContext, + bypassModes: Option[Set[TopicSocialProofFilteringBypassMode]] + ): Future[Set[ScEntityId]] = { + + val denyListTopicIdsFuture = topicListingSetting match { + case TopicListingSetting.ImplicitFollow => + getFollowedTopicIds(userId) + case _ => + Future(Set.empty[ScEntityId]) + } + + // we don't filter opt-out topics for implicit follow topic listing setting + val optOutTopicIdsFuture = topicListingSetting match { + case TopicListingSetting.ImplicitFollow => Future(Set.empty[ScEntityId]) + case _ => getOptOutTopics(userId) + } + + val notInterestedTopicIdsFuture = + if (bypassModes.exists(_.contains(TopicSocialProofFilteringBypassMode.NotInterested))) { + Future(Set.empty[ScEntityId]) + } else { + getNotInterestedIn(userId) + } + val pausedTopicIdsFuture = Future.value(getPausedTopics(context)) + + Future + .collect( + List( + denyListTopicIdsFuture, + optOutTopicIdsFuture, + notInterestedTopicIdsFuture, + pausedTopicIdsFuture)).map { list => list.reduce(_ ++ _) } + } + + private[this] def getDiff( + aFut: Future[Set[ScEntityId]], + bFut: Future[Set[ScEntityId]] + ): Future[Set[ScEntityId]] = { + Future.join(aFut, bFut).map { + case (a, b) => a.diff(b) + } + } + + /** + * calculates the diff of all the whitelisted IDs with blacklisted IDs and returns the set of IDs + * that we will be recommending from or followed topics by the user by client setting. + */ + def getAllowListTopicsForUser( + userId: UserId, + topicListingSetting: TopicListingSetting, + context: TopicListingViewerContext, + bypassModes: Option[Set[TopicSocialProofFilteringBypassMode]] + ): Future[Map[ScEntityId, Option[TopicFollowType]]] = { + + /** + * Title: an illustrative table to explain how allow list is composed + * AllowList = WhiteList - DenyList - OptOutTopics - PausedTopics - NotInterestedInTopics + * + * TopicListingSetting: Following ImplicitFollow All Followable + * Whitelist: FollowedTopics(user) AllWhitelistedTopics Nil AllWhitelistedTopics + * DenyList: Nil FollowedTopics(user) Nil Nil + * + * ps. for TopicListingSetting.All, the returned allow list is Nil. Why? + * It's because that allowList is not required given the TopicListingSetting == 'All'. + * See TopicSocialProofHandler.filterByAllowedList() for more details. + */ + + topicListingSetting match { + // "All" means all the UTT entity is qualified. So don't need to fetch the Whitelist anymore. + case TopicListingSetting.All => Future.value(Map.empty) + case TopicListingSetting.Following => + getFollowingTopicsForUserWithTimestamp(userId, context, bypassModes).map { + _.mapValues(_ => Some(TopicFollowType.Following)) + } + case TopicListingSetting.ImplicitFollow => + getDiff( + getWhitelistTopicIds(context, enableInternationalTopics = true), + getDenyListTopicIdsForUser(userId, topicListingSetting, context, bypassModes)).map { + _.map { scEntityId => + scEntityId -> Some(TopicFollowType.ImplicitFollow) + }.toMap + } + case _ => + val followedTopicIdsFut = getFollowedTopicIds(userId) + val allowListTopicIdsFut = getDiff( + getWhitelistTopicIds(context, enableInternationalTopics = true), + getDenyListTopicIdsForUser(userId, topicListingSetting, context, bypassModes)) + Future.join(allowListTopicIdsFut, followedTopicIdsFut).map { + case (allowListTopicId, followedTopicIds) => + allowListTopicId.map { scEntityId => + if (followedTopicIds.contains(scEntityId)) + scEntityId -> Some(TopicFollowType.Following) + else scEntityId -> Some(TopicFollowType.ImplicitFollow) + }.toMap + } + } + } + + private[this] def getFollowingTopicsForUserWithTimestamp( + userId: UserId, + context: TopicListingViewerContext, + bypassModes: Option[Set[TopicSocialProofFilteringBypassMode]] + ): Future[Map[ScEntityId, Option[Long]]] = { + + val followedTopicIdToTimestampFut = getFollowedTopics(userId).map(_.map { followedTopic => + followedTopic.entityId -> followedTopic.topicFollowTimestamp + }.toMap) + + followedTopicIdToTimestampFut.flatMap { followedTopicIdToTimestamp => + getDiff( + Future(followedTopicIdToTimestamp.keySet), + getDenyListTopicIdsForUser(userId, TopicListingSetting.Following, context, bypassModes) + ).map { + _.map { scEntityId => + scEntityId -> followedTopicIdToTimestamp.get(scEntityId).flatten + }.toMap + } + } + } +} + +object UttTopicFilterStore { + val DefaultNotInterestedInTimeout: Duration = 60.milliseconds + val DefaultOptOutTimeout: Duration = 60.milliseconds + val DefaultInterestedInTimeout: Duration = 60.milliseconds +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/utils/BUILD b/topic-social-proof/server/src/main/scala/com/twitter/tsp/utils/BUILD new file mode 100644 index 000000000..3f4c6f42c --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/utils/BUILD @@ -0,0 +1,14 @@ +scala_library( + compiler_option_sets = ["fatal_warnings"], + tags = [ + "bazel-compatible", + ], + dependencies = [ + "3rdparty/jvm/org/lz4:lz4-java", + "content-recommender/thrift/src/main/thrift:thrift-scala", + "frigate/frigate-common/src/main/scala/com/twitter/frigate/common/store", + "frigate/frigate-common/src/main/scala/com/twitter/frigate/common/store/health", + "stitch/stitch-storehaus", + "topic-social-proof/server/src/main/thrift:thrift-scala", + ], +) diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/utils/LZ4Injection.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/utils/LZ4Injection.scala new file mode 100644 index 000000000..c72b6032f --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/utils/LZ4Injection.scala @@ -0,0 +1,19 @@ +package com.twitter.tsp.utils + +import com.twitter.bijection.Injection +import scala.util.Try +import net.jpountz.lz4.LZ4CompressorWithLength +import net.jpountz.lz4.LZ4DecompressorWithLength +import net.jpountz.lz4.LZ4Factory + +object LZ4Injection extends Injection[Array[Byte], Array[Byte]] { + private val lz4Factory = LZ4Factory.fastestInstance() + private val fastCompressor = new LZ4CompressorWithLength(lz4Factory.fastCompressor()) + private val decompressor = new LZ4DecompressorWithLength(lz4Factory.fastDecompressor()) + + override def apply(a: Array[Byte]): Array[Byte] = LZ4Injection.fastCompressor.compress(a) + + override def invert(b: Array[Byte]): Try[Array[Byte]] = Try { + LZ4Injection.decompressor.decompress(b) + } +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/utils/ReadableStoreWithMapOptionValues.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/utils/ReadableStoreWithMapOptionValues.scala new file mode 100644 index 000000000..ddae5a310 --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/utils/ReadableStoreWithMapOptionValues.scala @@ -0,0 +1,20 @@ +package com.twitter.tsp.utils + +import com.twitter.storehaus.AbstractReadableStore +import com.twitter.storehaus.ReadableStore +import com.twitter.util.Future + +class ReadableStoreWithMapOptionValues[K, V1, V2](rs: ReadableStore[K, V1]) { + + def mapOptionValues( + fn: V1 => Option[V2] + ): ReadableStore[K, V2] = { + val self = rs + new AbstractReadableStore[K, V2] { + override def get(k: K): Future[Option[V2]] = self.get(k).map(_.flatMap(fn)) + + override def multiGet[K1 <: K](ks: Set[K1]): Map[K1, Future[Option[V2]]] = + self.multiGet(ks).mapValues(_.map(_.flatMap(fn))) + } + } +} diff --git a/topic-social-proof/server/src/main/scala/com/twitter/tsp/utils/SeqObjectInjection.scala b/topic-social-proof/server/src/main/scala/com/twitter/tsp/utils/SeqObjectInjection.scala new file mode 100644 index 000000000..96a0740e4 --- /dev/null +++ b/topic-social-proof/server/src/main/scala/com/twitter/tsp/utils/SeqObjectInjection.scala @@ -0,0 +1,32 @@ +package com.twitter.tsp.utils + +import com.twitter.bijection.Injection +import java.io.ByteArrayInputStream +import java.io.ByteArrayOutputStream +import java.io.ObjectInputStream +import java.io.ObjectOutputStream +import java.io.Serializable +import scala.util.Try + +/** + * @tparam T must be a serializable class + */ +case class SeqObjectInjection[T <: Serializable]() extends Injection[Seq[T], Array[Byte]] { + + override def apply(seq: Seq[T]): Array[Byte] = { + val byteStream = new ByteArrayOutputStream() + val outputStream = new ObjectOutputStream(byteStream) + outputStream.writeObject(seq) + outputStream.close() + byteStream.toByteArray + } + + override def invert(bytes: Array[Byte]): Try[Seq[T]] = { + Try { + val inputStream = new ObjectInputStream(new ByteArrayInputStream(bytes)) + val seq = inputStream.readObject().asInstanceOf[Seq[T]] + inputStream.close() + seq + } + } +} diff --git a/topic-social-proof/server/src/main/thrift/BUILD b/topic-social-proof/server/src/main/thrift/BUILD new file mode 100644 index 000000000..9bdbb71e0 --- /dev/null +++ b/topic-social-proof/server/src/main/thrift/BUILD @@ -0,0 +1,21 @@ +create_thrift_libraries( + base_name = "thrift", + sources = ["*.thrift"], + platform = "java8", + tags = [ + "bazel-compatible", + ], + dependency_roots = [ + "content-recommender/thrift/src/main/thrift", + "content-recommender/thrift/src/main/thrift:content-recommender-common", + "interests-service/thrift/src/main/thrift", + "src/thrift/com/twitter/simclusters_v2:simclusters_v2-thrift", + ], + generate_languages = [ + "java", + "scala", + "strato", + ], + provides_java_name = "tsp-thrift-java", + provides_scala_name = "tsp-thrift-scala", +) diff --git a/topic-social-proof/server/src/main/thrift/service.thrift b/topic-social-proof/server/src/main/thrift/service.thrift new file mode 100644 index 000000000..70f3c5398 --- /dev/null +++ b/topic-social-proof/server/src/main/thrift/service.thrift @@ -0,0 +1,104 @@ +namespace java com.twitter.tsp.thriftjava +namespace py gen.twitter.tsp +#@namespace scala com.twitter.tsp.thriftscala +#@namespace strato com.twitter.tsp.strato + +include "com/twitter/contentrecommender/common.thrift" +include "com/twitter/simclusters_v2/identifier.thrift" +include "com/twitter/simclusters_v2/online_store.thrift" +include "topic_listing.thrift" + +enum TopicListingSetting { + All = 0 // All the existing Semantic Core Entity/Topics. ie., All topics on twitter, and may or may not have been launched yet. + Followable = 1 // All the topics which the user is allowed to follow. ie., topics that have shipped, and user may or may not be following it. + Following = 2 // Only topics the user is explicitly following + ImplicitFollow = 3 // The topics user has not followed but implicitly may follow. ie., Only topics that user has not followed. +} (hasPersonalData='false') + + +// used to tell Topic Social Proof endpoint which specific filtering can be bypassed +enum TopicSocialProofFilteringBypassMode { + NotInterested = 0 +} (hasPersonalData='false') + +struct TopicSocialProofRequest { + 1: required i64 userId(personalDataType = "UserId") + 2: required set tweetIds(personalDataType = 'TweetId') + 3: required common.DisplayLocation displayLocation + 4: required TopicListingSetting topicListingSetting + 5: required topic_listing.TopicListingViewerContext context + 6: optional set bypassModes + 7: optional map> tags +} + +struct TopicSocialProofOptions { + 1: required i64 userId(personalDataType = "UserId") + 2: required common.DisplayLocation displayLocation + 3: required TopicListingSetting topicListingSetting + 4: required topic_listing.TopicListingViewerContext context + 5: optional set bypassModes + 6: optional map> tags +} + +struct TopicSocialProofResponse { + 1: required map> socialProofs +}(hasPersonalData='false') + +// Distinguishes between how a topic tweet is generated. Useful for metric tracking and debugging +enum TopicTweetType { + // CrOON candidates + UserInterestedIn = 1 + Twistly = 2 + // crTopic candidates + SkitConsumerEmbeddings = 100 + SkitProducerEmbeddings = 101 + SkitHighPrecision = 102 + SkitInterestBrowser = 103 + Certo = 104 +}(persisted='true') + +struct TopicWithScore { + 1: required i64 topicId + 2: required double score // score used to rank topics relative to one another + 3: optional TopicTweetType algorithmType // how the topic is generated + 4: optional TopicFollowType topicFollowType // Whether the topic is being explicitly or implicily followed +}(persisted='true', hasPersonalData='false') + + +struct ScoreKey { + 1: required identifier.EmbeddingType userEmbeddingType + 2: required identifier.EmbeddingType topicEmbeddingType + 3: required online_store.ModelVersion modelVersion +}(persisted='true', hasPersonalData='false') + +struct UserTopicScore { + 1: required map scores +}(persisted='true', hasPersonalData='false') + + +enum TopicFollowType { + Following = 1 + ImplicitFollow = 2 +}(persisted='true') + +// Provide the Tags which provides the Recommended Tweets Source Signal and other context. +// Warning: Please don't use this tag in any ML Features or business logic. +enum MetricTag { + // Source Signal Tags + TweetFavorite = 0 + Retweet = 1 + + UserFollow = 101 + PushOpenOrNtabClick = 201 + + HomeTweetClick = 301 + HomeVideoView = 302 + HomeSongbirdShowMore = 303 + + + InterestsRankerRecentSearches = 401 // For Interests Candidate Expansion + + UserInterestedIn = 501 + MBCG = 503 + // Other Metric Tags +} (persisted='true', hasPersonalData='true') diff --git a/topic-social-proof/server/src/main/thrift/tweet_info.thrift b/topic-social-proof/server/src/main/thrift/tweet_info.thrift new file mode 100644 index 000000000..d32b1aeac --- /dev/null +++ b/topic-social-proof/server/src/main/thrift/tweet_info.thrift @@ -0,0 +1,26 @@ +namespace java com.twitter.tsp.thriftjava +namespace py gen.twitter.tsp +#@namespace scala com.twitter.tsp.thriftscala +#@namespace strato com.twitter.tsp.strato + +struct TspTweetInfo { + 1: required i64 authorId + 2: required i64 favCount + 3: optional string language + 6: optional bool hasImage + 7: optional bool hasVideo + 8: optional bool hasGif + 9: optional bool isNsfwAuthor + 10: optional bool isKGODenylist + 11: optional bool isNullcast + // available if the tweet contains video + 12: optional i32 videoDurationSeconds + 13: optional bool isHighMediaResolution + 14: optional bool isVerticalAspectRatio + // health signal scores + 15: optional bool isPassAgathaHealthFilterStrictest + 16: optional bool isPassTweetHealthFilterStrictest + 17: optional bool isReply + 18: optional bool hasMultipleMedia + 23: optional bool hasUrl +}(persisted='false', hasPersonalData='true')