diff --git a/representation-scorer/BUILD.bazel b/representation-scorer/BUILD.bazel new file mode 100644 index 000000000..1624a57d4 --- /dev/null +++ b/representation-scorer/BUILD.bazel @@ -0,0 +1 @@ +# This prevents SQ query from grabbing //:all since it traverses up once to find a BUILD diff --git a/representation-scorer/README.md b/representation-scorer/README.md new file mode 100644 index 000000000..b74e3472f --- /dev/null +++ b/representation-scorer/README.md @@ -0,0 +1,5 @@ +# Representation Scorer # + +**Representation Scorer** (RSX) serves as a centralized scoring system, offering SimClusters or other embedding-based scoring solutions as machine learning features. + +The Representation Scorer acquires user behavior data from the User Signal Service (USS) and extracts embeddings from the Representation Manager (RMS). It then calculates both pairwise and listwise features. These features are used at various stages, including candidate retrieval and ranking. \ No newline at end of file diff --git a/representation-scorer/bin/canary-check.sh b/representation-scorer/bin/canary-check.sh new file mode 100755 index 000000000..cbb31f9ad --- /dev/null +++ b/representation-scorer/bin/canary-check.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +export CANARY_CHECK_ROLE="representation-scorer" +export CANARY_CHECK_NAME="representation-scorer" +export CANARY_CHECK_INSTANCES="0-19" + +python3 relevance-platform/tools/canary_check.py "$@" + diff --git a/representation-scorer/bin/deploy.sh b/representation-scorer/bin/deploy.sh new file mode 100755 index 000000000..2f1ab8a69 --- /dev/null +++ b/representation-scorer/bin/deploy.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash + +JOB=representation-scorer bazel run --ui_event_filters=-info,-stdout,-stderr --noshow_progress \ + //relevance-platform/src/main/python/deploy -- "$@" diff --git a/representation-scorer/bin/remote-debug-tunnel.sh b/representation-scorer/bin/remote-debug-tunnel.sh new file mode 100755 index 000000000..2a6e71511 --- /dev/null +++ b/representation-scorer/bin/remote-debug-tunnel.sh @@ -0,0 +1,66 @@ +#!/bin/bash + +set -o nounset +set -eu + +DC="atla" +ROLE="$USER" +SERVICE="representation-scorer" +INSTANCE="0" +KEY="$DC/$ROLE/devel/$SERVICE/$INSTANCE" + +while test $# -gt 0; do + case "$1" in + -h|--help) + echo "$0 Set up an ssh tunnel for $SERVICE remote debugging and disable aurora health checks" + echo " " + echo "See representation-scorer/README.md for details of how to use this script, and go/remote-debug for" + echo "general information about remote debugging in Aurora" + echo " " + echo "Default instance if called with no args:" + echo " $KEY" + echo " " + echo "Positional args:" + echo " $0 [datacentre] [role] [service_name] [instance]" + echo " " + echo "Options:" + echo " -h, --help show brief help" + exit 0 + ;; + *) + break + ;; + esac +done + +if [ -n "${1-}" ]; then + DC="$1" +fi + +if [ -n "${2-}" ]; then + ROLE="$2" +fi + +if [ -n "${3-}" ]; then + SERVICE="$3" +fi + +if [ -n "${4-}" ]; then + INSTANCE="$4" +fi + +KEY="$DC/$ROLE/devel/$SERVICE/$INSTANCE" +read -p "Set up remote debugger tunnel for $KEY? (y/n) " -r CONFIRM +if [[ ! $CONFIRM =~ ^[Yy]$ ]]; then + echo "Exiting, tunnel not created" + exit 1 +fi + +echo "Disabling health check and opening tunnel. Exit with control-c when you're finished" +CMD="aurora task ssh $KEY -c 'touch .healthchecksnooze' && aurora task ssh $KEY -L '5005:debug' --ssh-options '-N -S none -v '" + +echo "Running $CMD" +eval "$CMD" + + + diff --git a/representation-scorer/docs/index.rst b/representation-scorer/docs/index.rst new file mode 100644 index 000000000..c4fd8966d --- /dev/null +++ b/representation-scorer/docs/index.rst @@ -0,0 +1,39 @@ +Representation Scorer (RSX) +########################### + +Overview +======== + +Representation Scorer (RSX) is a StratoFed service which serves scores for pairs of entities (User, Tweet, Topic...) based on some representation of those entities. For example, it serves User-Tweet scores based on the cosine similarity of SimClusters embeddings for each of these. It aims to provide these with low latency and at high scale, to support applications such as scoring for ANN candidate generation and feature hydration via feature store. + + +Current use cases +----------------- + +RSX currently serves traffic for the following use cases: + +- User-Tweet similarity scores for Home ranking, using SimClusters embedding dot product +- Topic-Tweet similarity scores for topical tweet candidate generation and topic social proof, using SimClusters embedding cosine similarity and CERTO scores +- Tweet-Tweet and User-Tweet similarity scores for ANN candidate generation, using SimClusters embedding cosine similarity +- (in development) User-Tweet similarity scores for Home ranking, based on various aggregations of similarities with recent faves, retweets and follows performed by the user + +Getting Started +=============== + +Fetching scores +--------------- + +Scores are served from the recommendations/representation_scorer/score column. + +Using RSX for your application +------------------------------ + +RSX may be a good fit for your application if you need scores based on combinations of SimCluster embeddings for core nouns. We also plan to support other embeddings and scoring approaches in the future. + +.. toctree:: + :maxdepth: 2 + :hidden: + + index + + diff --git a/representation-scorer/server/BUILD b/representation-scorer/server/BUILD new file mode 100644 index 000000000..cc7325192 --- /dev/null +++ b/representation-scorer/server/BUILD @@ -0,0 +1,22 @@ +jvm_binary( + name = "bin", + basename = "representation-scorer", + main = "com.twitter.representationscorer.RepresentationScorerFedServerMain", + platform = "java8", + tags = ["bazel-compatible"], + dependencies = [ + "finatra/inject/inject-logback/src/main/scala", + "loglens/loglens-logback/src/main/scala/com/twitter/loglens/logback", + "representation-scorer/server/src/main/resources", + "representation-scorer/server/src/main/scala/com/twitter/representationscorer", + "twitter-server/logback-classic/src/main/scala", + ], +) + +# Aurora Workflows build phase convention requires a jvm_app named with ${project-name}-app +jvm_app( + name = "representation-scorer-app", + archive = "zip", + binary = ":bin", + tags = ["bazel-compatible"], +) diff --git a/representation-scorer/server/src/main/resources/BUILD b/representation-scorer/server/src/main/resources/BUILD new file mode 100644 index 000000000..150a224ff --- /dev/null +++ b/representation-scorer/server/src/main/resources/BUILD @@ -0,0 +1,9 @@ +resources( + sources = [ + "*.xml", + "*.yml", + "com/twitter/slo/slo.json", + "config/*.yml", + ], + tags = ["bazel-compatible"], +) diff --git a/representation-scorer/server/src/main/resources/com/twitter/slo/slo.json b/representation-scorer/server/src/main/resources/com/twitter/slo/slo.json new file mode 100644 index 000000000..836b44058 --- /dev/null +++ b/representation-scorer/server/src/main/resources/com/twitter/slo/slo.json @@ -0,0 +1,55 @@ +{ + "servers": [ + { + "name": "strato", + "indicators": [ + { + "id": "success_rate_3m", + "indicator_type": "SuccessRateIndicator", + "duration": 3, + "duration_unit": "MINUTES" + }, { + "id": "latency_3m_p99", + "indicator_type": "LatencyIndicator", + "duration": 3, + "duration_unit": "MINUTES", + "percentile": 0.99 + } + ], + "objectives": [ + { + "indicator": "success_rate_3m", + "objective_type": "SuccessRateObjective", + "operator": ">=", + "threshold": 0.995 + }, + { + "indicator": "latency_3m_p99", + "objective_type": "LatencyObjective", + "operator": "<=", + "threshold": 50 + } + ], + "long_term_objectives": [ + { + "id": "success_rate_28_days", + "objective_type": "SuccessRateObjective", + "operator": ">=", + "threshold": 0.993, + "duration": 28, + "duration_unit": "DAYS" + }, + { + "id": "latency_p99_28_days", + "objective_type": "LatencyObjective", + "operator": "<=", + "threshold": 60, + "duration": 28, + "duration_unit": "DAYS", + "percentile": 0.99 + } + ] + } + ], + "@version": 1 +} diff --git a/representation-scorer/server/src/main/resources/config/decider.yml b/representation-scorer/server/src/main/resources/config/decider.yml new file mode 100644 index 000000000..56ae90418 --- /dev/null +++ b/representation-scorer/server/src/main/resources/config/decider.yml @@ -0,0 +1,155 @@ +enableLogFavBasedApeEntity20M145KUpdatedEmbeddingCachedStore: + comment: "Enable to use the non-empty store for logFavBasedApeEntity20M145KUpdatedEmbeddingCachedStore (from 0% to 100%). 0 means use EMPTY readable store for all requests." + default_availability: 0 + +enableLogFavBasedApeEntity20M145K2020EmbeddingCachedStore: + comment: "Enable to use the non-empty store for logFavBasedApeEntity20M145K2020EmbeddingCachedStore (from 0% to 100%). 0 means use EMPTY readable store for all requests." + default_availability: 0 + +representation-scorer_forward_dark_traffic: + comment: "Defines the percentage of traffic to forward to diffy-proxy. Set to 0 to disable dark traffic forwarding" + default_availability: 0 + +"representation-scorer_load_shed_non_prod_callers": + comment: "Discard traffic from all non-prod callers" + default_availability: 0 + +enable_log_fav_based_tweet_embedding_20m145k2020_timeouts: + comment: "If enabled, set a timeout on calls to the logFavBased20M145K2020TweetEmbeddingStore" + default_availability: 0 + +log_fav_based_tweet_embedding_20m145k2020_timeout_value_millis: + comment: "The value of this decider defines the timeout (in milliseconds) to use on calls to the logFavBased20M145K2020TweetEmbeddingStore, i.e. 1.50% is 150ms. Only applied if enable_log_fav_based_tweet_embedding_20m145k2020_timeouts is true" + default_availability: 2000 + +enable_log_fav_based_tweet_embedding_20m145kUpdated_timeouts: + comment: "If enabled, set a timeout on calls to the logFavBased20M145KUpdatedTweetEmbeddingStore" + default_availability: 0 + +log_fav_based_tweet_embedding_20m145kUpdated_timeout_value_millis: + comment: "The value of this decider defines the timeout (in milliseconds) to use on calls to the logFavBased20M145KUpdatedTweetEmbeddingStore, i.e. 1.50% is 150ms. Only applied if enable_log_fav_based_tweet_embedding_20m145kUpdated_timeouts is true" + default_availability: 2000 + +enable_cluster_tweet_index_store_timeouts: + comment: "If enabled, set a timeout on calls to the ClusterTweetIndexStore" + default_availability: 0 + +cluster_tweet_index_store_timeout_value_millis: + comment: "The value of this decider defines the timeout (in milliseconds) to use on calls to the ClusterTweetIndexStore, i.e. 1.50% is 150ms. Only applied if enable_cluster_tweet_index_store_timeouts is true" + default_availability: 2000 + +representation_scorer_fetch_signal_share: + comment: "If enabled, fetches share signals from USS" + default_availability: 0 + +representation_scorer_fetch_signal_reply: + comment: "If enabled, fetches reply signals from USS" + default_availability: 0 + +representation_scorer_fetch_signal_original_tweet: + comment: "If enabled, fetches original tweet signals from USS" + default_availability: 0 + +representation_scorer_fetch_signal_video_playback: + comment: "If enabled, fetches video playback signals from USS" + default_availability: 0 + +representation_scorer_fetch_signal_block: + comment: "If enabled, fetches account block signals from USS" + default_availability: 0 + +representation_scorer_fetch_signal_mute: + comment: "If enabled, fetches account mute signals from USS" + default_availability: 0 + +representation_scorer_fetch_signal_report: + comment: "If enabled, fetches tweet report signals from USS" + default_availability: 0 + +representation_scorer_fetch_signal_dont_like: + comment: "If enabled, fetches tweet don't like signals from USS" + default_availability: 0 + +representation_scorer_fetch_signal_see_fewer: + comment: "If enabled, fetches tweet see fewer signals from USS" + default_availability: 0 + +# To create a new decider, add here with the same format and caller's details : "representation-scorer_load_shed_by_caller_id_twtr:{{role}}:{{name}}:{{environment}}:{{cluster}}" +# All the deciders below are generated by this script - ./strato/bin/fed deciders ./ --service-role=representation-scorer --service-name=representation-scorer +# If you need to run the script and paste the output, add only the prod deciders here. Non-prod ones are being taken care of by representation-scorer_load_shed_non_prod_callers + +"representation-scorer_load_shed_by_caller_id_all": + comment: "Reject all traffic from caller id: all" + default_availability: 0 + +"representation-scorer_load_shed_by_caller_id_twtr:svc:frigate:frigate-pushservice-canary:prod:atla": + comment: "Reject all traffic from caller id: twtr:svc:frigate:frigate-pushservice-canary:prod:atla" + default_availability: 0 + +"representation-scorer_load_shed_by_caller_id_twtr:svc:frigate:frigate-pushservice-canary:prod:pdxa": + comment: "Reject all traffic from caller id: twtr:svc:frigate:frigate-pushservice-canary:prod:pdxa" + default_availability: 0 + +"representation-scorer_load_shed_by_caller_id_twtr:svc:frigate:frigate-pushservice-send:prod:atla": + comment: "Reject all traffic from caller id: twtr:svc:frigate:frigate-pushservice-send:prod:atla" + default_availability: 0 + +"representation-scorer_load_shed_by_caller_id_twtr:svc:frigate:frigate-pushservice:prod:atla": + comment: "Reject all traffic from caller id: twtr:svc:frigate:frigate-pushservice:prod:atla" + default_availability: 0 + +"representation-scorer_load_shed_by_caller_id_twtr:svc:frigate:frigate-pushservice:prod:pdxa": + comment: "Reject all traffic from caller id: twtr:svc:frigate:frigate-pushservice:prod:pdxa" + default_availability: 0 + +"representation-scorer_load_shed_by_caller_id_twtr:svc:frigate:frigate-pushservice:staging:atla": + comment: "Reject all traffic from caller id: twtr:svc:frigate:frigate-pushservice:staging:atla" + default_availability: 0 + +"representation-scorer_load_shed_by_caller_id_twtr:svc:frigate:frigate-pushservice:staging:pdxa": + comment: "Reject all traffic from caller id: twtr:svc:frigate:frigate-pushservice:staging:pdxa" + default_availability: 0 + +"representation-scorer_load_shed_by_caller_id_twtr:svc:home-scorer:home-scorer:prod:atla": + comment: "Reject all traffic from caller id: twtr:svc:home-scorer:home-scorer:prod:atla" + default_availability: 0 + +"representation-scorer_load_shed_by_caller_id_twtr:svc:home-scorer:home-scorer:prod:pdxa": + comment: "Reject all traffic from caller id: twtr:svc:home-scorer:home-scorer:prod:pdxa" + default_availability: 0 + +"representation-scorer_load_shed_by_caller_id_twtr:svc:stratostore:stratoapi:prod:atla": + comment: "Reject all traffic from caller id: twtr:svc:stratostore:stratoapi:prod:atla" + default_availability: 0 + +"representation-scorer_load_shed_by_caller_id_twtr:svc:stratostore:stratoserver:prod:atla": + comment: "Reject all traffic from caller id: twtr:svc:stratostore:stratoserver:prod:atla" + default_availability: 0 + +"representation-scorer_load_shed_by_caller_id_twtr:svc:stratostore:stratoserver:prod:pdxa": + comment: "Reject all traffic from caller id: twtr:svc:stratostore:stratoserver:prod:pdxa" + default_availability: 0 + +"representation-scorer_load_shed_by_caller_id_twtr:svc:timelinescorer:timelinescorer:prod:atla": + comment: "Reject all traffic from caller id: twtr:svc:timelinescorer:timelinescorer:prod:atla" + default_availability: 0 + +"representation-scorer_load_shed_by_caller_id_twtr:svc:timelinescorer:timelinescorer:prod:pdxa": + comment: "Reject all traffic from caller id: twtr:svc:timelinescorer:timelinescorer:prod:pdxa" + default_availability: 0 + +"representation-scorer_load_shed_by_caller_id_twtr:svc:topic-social-proof:topic-social-proof:prod:atla": + comment: "Reject all traffic from caller id: twtr:svc:topic-social-proof:topic-social-proof:prod:atla" + default_availability: 0 + +"representation-scorer_load_shed_by_caller_id_twtr:svc:topic-social-proof:topic-social-proof:prod:pdxa": + comment: "Reject all traffic from caller id: twtr:svc:topic-social-proof:topic-social-proof:prod:pdxa" + default_availability: 0 + +"enable_sim_clusters_embedding_store_timeouts": + comment: "If enabled, set a timeout on calls to the SimClustersEmbeddingStore" + default_availability: 10000 + +sim_clusters_embedding_store_timeout_value_millis: + comment: "The value of this decider defines the timeout (in milliseconds) to use on calls to the SimClustersEmbeddingStore, i.e. 1.50% is 150ms. Only applied if enable_sim_clusters_embedding_store_timeouts is true" + default_availability: 2000 diff --git a/representation-scorer/server/src/main/resources/logback.xml b/representation-scorer/server/src/main/resources/logback.xml new file mode 100644 index 000000000..cf7028151 --- /dev/null +++ b/representation-scorer/server/src/main/resources/logback.xml @@ -0,0 +1,165 @@ + + + + + + + + + + + + + + + + + true + + + + + + + + + + + ${log.service.output} + + + ${log.service.output}.%d.gz + + 3GB + + 21 + true + + + %date %.-3level ${DEFAULT_SERVICE_PATTERN}%n + + + + + + ${log.access.output} + + + ${log.access.output}.%d.gz + + 100MB + + 7 + true + + + ${DEFAULT_ACCESS_PATTERN}%n + + + + + + true + ${log.lens.category} + ${log.lens.index} + ${log.lens.tag}/service + + %msg + + + + + + true + ${log.lens.category} + ${log.lens.index} + ${log.lens.tag}/access + + %msg + + + + + + allow_listed_pipeline_executions.log + + + allow_listed_pipeline_executions.log.%d.gz + + 100MB + + 7 + true + + + %date %.-3level ${DEFAULT_SERVICE_PATTERN}%n + + + + + + + + + + + + ${async_queue_size} + ${async_max_flush_time} + + + + + ${async_queue_size} + ${async_max_flush_time} + + + + + ${async_queue_size} + ${async_max_flush_time} + + + + + ${async_queue_size} + ${async_max_flush_time} + + + + + ${async_queue_size} + ${async_max_flush_time} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/representation-scorer/server/src/main/scala/com/twitter/representationscorer/BUILD b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/BUILD new file mode 100644 index 000000000..fdb60da54 --- /dev/null +++ b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/BUILD @@ -0,0 +1,13 @@ +scala_library( + compiler_option_sets = ["fatal_warnings"], + platform = "java8", + tags = ["bazel-compatible"], + dependencies = [ + "finagle-internal/slo/src/main/scala/com/twitter/finagle/slo", + "finatra/inject/inject-thrift-client", + "representation-scorer/server/src/main/scala/com/twitter/representationscorer/columns", + "strato/src/main/scala/com/twitter/strato/fed", + "strato/src/main/scala/com/twitter/strato/fed/server", + "twitter-server-internal/src/main/scala", + ], +) diff --git a/representation-scorer/server/src/main/scala/com/twitter/representationscorer/RepresentationScorerFedServer.scala b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/RepresentationScorerFedServer.scala new file mode 100644 index 000000000..a0a203311 --- /dev/null +++ b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/RepresentationScorerFedServer.scala @@ -0,0 +1,38 @@ +package com.twitter.representationscorer + +import com.google.inject.Module +import com.twitter.inject.thrift.modules.ThriftClientIdModule +import com.twitter.representationscorer.columns.ListScoreColumn +import com.twitter.representationscorer.columns.ScoreColumn +import com.twitter.representationscorer.columns.SimClustersRecentEngagementSimilarityColumn +import com.twitter.representationscorer.columns.SimClustersRecentEngagementSimilarityUserTweetEdgeColumn +import com.twitter.representationscorer.modules.CacheModule +import com.twitter.representationscorer.modules.EmbeddingStoreModule +import com.twitter.representationscorer.modules.RMSConfigModule +import com.twitter.representationscorer.modules.TimerModule +import com.twitter.representationscorer.twistlyfeatures.UserSignalServiceRecentEngagementsClientModule +import com.twitter.strato.fed._ +import com.twitter.strato.fed.server._ + +object RepresentationScorerFedServerMain extends RepresentationScorerFedServer + +trait RepresentationScorerFedServer extends StratoFedServer { + override def dest: String = "/s/representation-scorer/representation-scorer" + override val modules: Seq[Module] = + Seq( + CacheModule, + ThriftClientIdModule, + UserSignalServiceRecentEngagementsClientModule, + TimerModule, + RMSConfigModule, + EmbeddingStoreModule + ) + + override def columns: Seq[Class[_ <: StratoFed.Column]] = + Seq( + classOf[ListScoreColumn], + classOf[ScoreColumn], + classOf[SimClustersRecentEngagementSimilarityUserTweetEdgeColumn], + classOf[SimClustersRecentEngagementSimilarityColumn] + ) +} diff --git a/representation-scorer/server/src/main/scala/com/twitter/representationscorer/columns/BUILD b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/columns/BUILD new file mode 100644 index 000000000..3352a51b9 --- /dev/null +++ b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/columns/BUILD @@ -0,0 +1,16 @@ +scala_library( + compiler_option_sets = ["fatal_warnings"], + platform = "java8", + tags = ["bazel-compatible"], + dependencies = [ + "content-recommender/thrift/src/main/thrift:thrift-scala", + "finatra/inject/inject-core/src/main/scala", + "representation-scorer/server/src/main/scala/com/twitter/representationscorer/common", + "representation-scorer/server/src/main/scala/com/twitter/representationscorer/modules", + "representation-scorer/server/src/main/scala/com/twitter/representationscorer/scorestore", + "representation-scorer/server/src/main/scala/com/twitter/representationscorer/twistlyfeatures", + "representation-scorer/server/src/main/thrift:thrift-scala", + "strato/src/main/scala/com/twitter/strato/fed", + "strato/src/main/scala/com/twitter/strato/fed/server", + ], +) diff --git a/representation-scorer/server/src/main/scala/com/twitter/representationscorer/columns/Info.scala b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/columns/Info.scala new file mode 100644 index 000000000..3b14a491f --- /dev/null +++ b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/columns/Info.scala @@ -0,0 +1,13 @@ +package com.twitter.representationscorer.columns + +import com.twitter.strato.config.{ContactInfo => StratoContactInfo} + +object Info { + val contactInfo: StratoContactInfo = StratoContactInfo( + description = "Please contact Relevance Platform team for more details", + contactEmail = "no-reply@twitter.com", + ldapGroup = "representation-scorer-admins", + jiraProject = "JIRA", + links = Seq("http://go.twitter.biz/rsx-runbook") + ) +} diff --git a/representation-scorer/server/src/main/scala/com/twitter/representationscorer/columns/ListScoreColumn.scala b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/columns/ListScoreColumn.scala new file mode 100644 index 000000000..04d8b8cb1 --- /dev/null +++ b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/columns/ListScoreColumn.scala @@ -0,0 +1,116 @@ +package com.twitter.representationscorer.columns + +import com.twitter.representationscorer.thriftscala.ListScoreId +import com.twitter.representationscorer.thriftscala.ListScoreResponse +import com.twitter.representationscorer.scorestore.ScoreStore +import com.twitter.representationscorer.thriftscala.ScoreResult +import com.twitter.simclusters_v2.common.SimClustersEmbeddingId.LongInternalId +import com.twitter.simclusters_v2.common.SimClustersEmbeddingId.LongSimClustersEmbeddingId +import com.twitter.simclusters_v2.thriftscala.Score +import com.twitter.simclusters_v2.thriftscala.ScoreId +import com.twitter.simclusters_v2.thriftscala.ScoreInternalId +import com.twitter.simclusters_v2.thriftscala.SimClustersEmbeddingId +import com.twitter.simclusters_v2.thriftscala.SimClustersEmbeddingPairScoreId +import com.twitter.stitch +import com.twitter.stitch.Stitch +import com.twitter.strato.catalog.OpMetadata +import com.twitter.strato.config.ContactInfo +import com.twitter.strato.config.Policy +import com.twitter.strato.data.Conv +import com.twitter.strato.data.Description.PlainText +import com.twitter.strato.data.Lifecycle +import com.twitter.strato.fed._ +import com.twitter.strato.thrift.ScroogeConv +import com.twitter.util.Future +import com.twitter.util.Return +import com.twitter.util.Throw +import javax.inject.Inject + +class ListScoreColumn @Inject() (scoreStore: ScoreStore) + extends StratoFed.Column("recommendations/representation_scorer/listScore") + with StratoFed.Fetch.Stitch { + + override val policy: Policy = Common.rsxReadPolicy + + override type Key = ListScoreId + override type View = Unit + override type Value = ListScoreResponse + + override val keyConv: Conv[Key] = ScroogeConv.fromStruct[ListScoreId] + override val viewConv: Conv[View] = Conv.ofType + override val valueConv: Conv[Value] = ScroogeConv.fromStruct[ListScoreResponse] + + override val contactInfo: ContactInfo = Info.contactInfo + + override val metadata: OpMetadata = OpMetadata( + lifecycle = Some(Lifecycle.Production), + description = Some( + PlainText( + "Scoring for multiple candidate entities against a single target entity" + )) + ) + + override def fetch(key: Key, view: View): Stitch[Result[Value]] = { + + val target = SimClustersEmbeddingId( + embeddingType = key.targetEmbeddingType, + modelVersion = key.modelVersion, + internalId = key.targetId + ) + val scoreIds = key.candidateIds.map { candidateId => + val candidate = SimClustersEmbeddingId( + embeddingType = key.candidateEmbeddingType, + modelVersion = key.modelVersion, + internalId = candidateId + ) + ScoreId( + algorithm = key.algorithm, + internalId = ScoreInternalId.SimClustersEmbeddingPairScoreId( + SimClustersEmbeddingPairScoreId(target, candidate) + ) + ) + } + + Stitch + .callFuture { + val (keys: Iterable[ScoreId], vals: Iterable[Future[Option[Score]]]) = + scoreStore.uniformScoringStore.multiGet(scoreIds.toSet).unzip + val results: Future[Iterable[Option[Score]]] = Future.collectToTry(vals.toSeq) map { + tryOptVals => + tryOptVals map { + case Return(Some(v)) => Some(v) + case Return(None) => None + case Throw(_) => None + } + } + val scoreMap: Future[Map[Long, Double]] = results.map { scores => + keys + .zip(scores).collect { + case ( + ScoreId( + _, + ScoreInternalId.SimClustersEmbeddingPairScoreId( + SimClustersEmbeddingPairScoreId( + _, + LongSimClustersEmbeddingId(candidateId)))), + Some(score)) => + (candidateId, score.score) + }.toMap + } + scoreMap + } + .map { (scores: Map[Long, Double]) => + val orderedScores = key.candidateIds.collect { + case LongInternalId(id) => ScoreResult(scores.get(id)) + case _ => + // This will return None scores for candidates which don't have Long ids, but that's fine: + // at the moment we're only scoring for Tweets + ScoreResult(None) + } + found(ListScoreResponse(orderedScores)) + } + .handle { + case stitch.NotFound => missing + } + } +} diff --git a/representation-scorer/server/src/main/scala/com/twitter/representationscorer/columns/ScoreColumn.scala b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/columns/ScoreColumn.scala new file mode 100644 index 000000000..6b565288b --- /dev/null +++ b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/columns/ScoreColumn.scala @@ -0,0 +1,48 @@ +package com.twitter.representationscorer.columns + +import com.twitter.contentrecommender.thriftscala.ScoringResponse +import com.twitter.representationscorer.scorestore.ScoreStore +import com.twitter.simclusters_v2.thriftscala.ScoreId +import com.twitter.stitch +import com.twitter.stitch.Stitch +import com.twitter.strato.config.ContactInfo +import com.twitter.strato.config.Policy +import com.twitter.strato.catalog.OpMetadata +import com.twitter.strato.data.Conv +import com.twitter.strato.data.Lifecycle +import com.twitter.strato.data.Description.PlainText +import com.twitter.strato.fed._ +import com.twitter.strato.thrift.ScroogeConv +import javax.inject.Inject + +class ScoreColumn @Inject() (scoreStore: ScoreStore) + extends StratoFed.Column("recommendations/representation_scorer/score") + with StratoFed.Fetch.Stitch { + + override val policy: Policy = Common.rsxReadPolicy + + override type Key = ScoreId + override type View = Unit + override type Value = ScoringResponse + + override val keyConv: Conv[Key] = ScroogeConv.fromStruct[ScoreId] + override val viewConv: Conv[View] = Conv.ofType + override val valueConv: Conv[Value] = ScroogeConv.fromStruct[ScoringResponse] + + override val contactInfo: ContactInfo = Info.contactInfo + + override val metadata: OpMetadata = OpMetadata( + lifecycle = Some(Lifecycle.Production), + description = Some(PlainText( + "The Uniform Scoring Endpoint in Representation Scorer for the Content-Recommender." + + " TDD: http://go/representation-scorer-tdd Guideline: http://go/uniform-scoring-guideline")) + ) + + override def fetch(key: Key, view: View): Stitch[Result[Value]] = + scoreStore + .uniformScoringStoreStitch(key) + .map(score => found(ScoringResponse(Some(score)))) + .handle { + case stitch.NotFound => missing + } +} diff --git a/representation-scorer/server/src/main/scala/com/twitter/representationscorer/columns/SimClustersRecentEngagementSimilarityColumn.scala b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/columns/SimClustersRecentEngagementSimilarityColumn.scala new file mode 100644 index 000000000..e14a67eae --- /dev/null +++ b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/columns/SimClustersRecentEngagementSimilarityColumn.scala @@ -0,0 +1,52 @@ +package com.twitter.representationscorer.columns + +import com.twitter.representationscorer.common.TweetId +import com.twitter.representationscorer.common.UserId +import com.twitter.representationscorer.thriftscala.RecentEngagementSimilaritiesResponse +import com.twitter.representationscorer.twistlyfeatures.Scorer +import com.twitter.stitch +import com.twitter.stitch.Stitch +import com.twitter.strato.catalog.OpMetadata +import com.twitter.strato.config.ContactInfo +import com.twitter.strato.config.Policy +import com.twitter.strato.data.Conv +import com.twitter.strato.data.Description.PlainText +import com.twitter.strato.data.Lifecycle +import com.twitter.strato.fed._ +import com.twitter.strato.thrift.ScroogeConv +import javax.inject.Inject + +class SimClustersRecentEngagementSimilarityColumn @Inject() (scorer: Scorer) + extends StratoFed.Column( + "recommendations/representation_scorer/simClustersRecentEngagementSimilarity") + with StratoFed.Fetch.Stitch { + + override val policy: Policy = Common.rsxReadPolicy + + override type Key = (UserId, Seq[TweetId]) + override type View = Unit + override type Value = RecentEngagementSimilaritiesResponse + + override val keyConv: Conv[Key] = Conv.ofType[(Long, Seq[Long])] + override val viewConv: Conv[View] = Conv.ofType + override val valueConv: Conv[Value] = + ScroogeConv.fromStruct[RecentEngagementSimilaritiesResponse] + + override val contactInfo: ContactInfo = Info.contactInfo + + override val metadata: OpMetadata = OpMetadata( + lifecycle = Some(Lifecycle.Production), + description = Some( + PlainText( + "User-Tweet scores based on the user's recent engagements for multiple tweets." + )) + ) + + override def fetch(key: Key, view: View): Stitch[Result[Value]] = + scorer + .get(key._1, key._2) + .map(results => found(RecentEngagementSimilaritiesResponse(results))) + .handle { + case stitch.NotFound => missing + } +} diff --git a/representation-scorer/server/src/main/scala/com/twitter/representationscorer/columns/SimClustersRecentEngagementSimilarityUserTweetEdgeColumn.scala b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/columns/SimClustersRecentEngagementSimilarityUserTweetEdgeColumn.scala new file mode 100644 index 000000000..e54d3a71b --- /dev/null +++ b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/columns/SimClustersRecentEngagementSimilarityUserTweetEdgeColumn.scala @@ -0,0 +1,52 @@ +package com.twitter.representationscorer.columns + +import com.twitter.representationscorer.common.TweetId +import com.twitter.representationscorer.common.UserId +import com.twitter.representationscorer.thriftscala.SimClustersRecentEngagementSimilarities +import com.twitter.representationscorer.twistlyfeatures.Scorer +import com.twitter.stitch +import com.twitter.stitch.Stitch +import com.twitter.strato.catalog.OpMetadata +import com.twitter.strato.config.ContactInfo +import com.twitter.strato.config.Policy +import com.twitter.strato.data.Conv +import com.twitter.strato.data.Description.PlainText +import com.twitter.strato.data.Lifecycle +import com.twitter.strato.fed._ +import com.twitter.strato.thrift.ScroogeConv +import javax.inject.Inject + +class SimClustersRecentEngagementSimilarityUserTweetEdgeColumn @Inject() (scorer: Scorer) + extends StratoFed.Column( + "recommendations/representation_scorer/simClustersRecentEngagementSimilarity.UserTweetEdge") + with StratoFed.Fetch.Stitch { + + override val policy: Policy = Common.rsxReadPolicy + + override type Key = (UserId, TweetId) + override type View = Unit + override type Value = SimClustersRecentEngagementSimilarities + + override val keyConv: Conv[Key] = Conv.ofType[(Long, Long)] + override val viewConv: Conv[View] = Conv.ofType + override val valueConv: Conv[Value] = + ScroogeConv.fromStruct[SimClustersRecentEngagementSimilarities] + + override val contactInfo: ContactInfo = Info.contactInfo + + override val metadata: OpMetadata = OpMetadata( + lifecycle = Some(Lifecycle.Production), + description = Some( + PlainText( + "User-Tweet scores based on the user's recent engagements" + )) + ) + + override def fetch(key: Key, view: View): Stitch[Result[Value]] = + scorer + .get(key._1, key._2) + .map(found(_)) + .handle { + case stitch.NotFound => missing + } +} diff --git a/representation-scorer/server/src/main/scala/com/twitter/representationscorer/common/BUILD b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/common/BUILD new file mode 100644 index 000000000..018cef9eb --- /dev/null +++ b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/common/BUILD @@ -0,0 +1,9 @@ +scala_library( + compiler_option_sets = ["fatal_warnings"], + platform = "java8", + tags = ["bazel-compatible"], + dependencies = [ + "decider/src/main/scala", + "src/scala/com/twitter/simclusters_v2/common", + ], +) diff --git a/representation-scorer/server/src/main/scala/com/twitter/representationscorer/common/DeciderConstants.scala b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/common/DeciderConstants.scala new file mode 100644 index 000000000..838835616 --- /dev/null +++ b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/common/DeciderConstants.scala @@ -0,0 +1,7 @@ +package com.twitter.representationscorer + +object DeciderConstants { + val enableSimClustersEmbeddingStoreTimeouts = "enable_sim_clusters_embedding_store_timeouts" + val simClustersEmbeddingStoreTimeoutValueMillis = + "sim_clusters_embedding_store_timeout_value_millis" +} diff --git a/representation-scorer/server/src/main/scala/com/twitter/representationscorer/common/RepresentationScorerDecider.scala b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/common/RepresentationScorerDecider.scala new file mode 100644 index 000000000..5aa4b4f2c --- /dev/null +++ b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/common/RepresentationScorerDecider.scala @@ -0,0 +1,27 @@ +package com.twitter.representationscorer.common + +import com.twitter.decider.Decider +import com.twitter.decider.RandomRecipient +import com.twitter.decider.Recipient +import com.twitter.simclusters_v2.common.DeciderGateBuilderWithIdHashing +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +case class RepresentationScorerDecider @Inject() (decider: Decider) { + + val deciderGateBuilder = new DeciderGateBuilderWithIdHashing(decider) + + def isAvailable(feature: String, recipient: Option[Recipient]): Boolean = { + decider.isAvailable(feature, recipient) + } + + /** + * When useRandomRecipient is set to false, the decider is either completely on or off. + * When useRandomRecipient is set to true, the decider is on for the specified % of traffic. + */ + def isAvailable(feature: String, useRandomRecipient: Boolean = true): Boolean = { + if (useRandomRecipient) isAvailable(feature, Some(RandomRecipient)) + else isAvailable(feature, None) + } +} diff --git a/representation-scorer/server/src/main/scala/com/twitter/representationscorer/common/package.scala b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/common/package.scala new file mode 100644 index 000000000..c5bf9c60a --- /dev/null +++ b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/common/package.scala @@ -0,0 +1,6 @@ +package com.twitter.representationscorer + +package object common { + type UserId = Long + type TweetId = Long +} diff --git a/representation-scorer/server/src/main/scala/com/twitter/representationscorer/modules/BUILD b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/modules/BUILD new file mode 100644 index 000000000..c73f2a68e --- /dev/null +++ b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/modules/BUILD @@ -0,0 +1,19 @@ +scala_library( + compiler_option_sets = ["fatal_warnings"], + platform = "java8", + tags = ["bazel-compatible"], + dependencies = [ + "finagle-internal/mtls/src/main/scala/com/twitter/finagle/mtls/authentication", + "finagle/finagle-stats", + "finatra/inject/inject-core/src/main/scala", + "representation-manager/client/src/main/scala/com/twitter/representation_manager", + "representation-manager/client/src/main/scala/com/twitter/representation_manager/config", + "representation-manager/server/src/main/scala/com/twitter/representation_manager/migration", + "representation-scorer/server/src/main/scala/com/twitter/representationscorer/common", + "servo/util", + "src/scala/com/twitter/simclusters_v2/stores", + "src/scala/com/twitter/storehaus_internal/memcache", + "src/scala/com/twitter/storehaus_internal/util", + "src/thrift/com/twitter/simclusters_v2:simclusters_v2-thrift-scala", + ], +) diff --git a/representation-scorer/server/src/main/scala/com/twitter/representationscorer/modules/CacheModule.scala b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/modules/CacheModule.scala new file mode 100644 index 000000000..b8b815872 --- /dev/null +++ b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/modules/CacheModule.scala @@ -0,0 +1,34 @@ +package com.twitter.representationscorer.modules + +import com.google.inject.Provides +import com.twitter.finagle.memcached.Client +import javax.inject.Singleton +import com.twitter.conversions.DurationOps._ +import com.twitter.inject.TwitterModule +import com.twitter.finagle.mtls.authentication.ServiceIdentifier +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.storehaus_internal.memcache.MemcacheStore +import com.twitter.storehaus_internal.util.ClientName +import com.twitter.storehaus_internal.util.ZkEndPoint + +object CacheModule extends TwitterModule { + + private val cacheDest = flag[String]("cache_module.dest", "Path to memcache service") + private val timeout = flag[Int]("memcache.timeout", "Memcache client timeout") + private val retries = flag[Int]("memcache.retries", "Memcache timeout retries") + + @Singleton + @Provides + def providesCache( + serviceIdentifier: ServiceIdentifier, + stats: StatsReceiver + ): Client = + MemcacheStore.memcachedClient( + name = ClientName("memcache_representation_manager"), + dest = ZkEndPoint(cacheDest()), + timeout = timeout().milliseconds, + retries = retries(), + statsReceiver = stats.scope("cache_client"), + serviceIdentifier = serviceIdentifier + ) +} diff --git a/representation-scorer/server/src/main/scala/com/twitter/representationscorer/modules/EmbeddingStoreModule.scala b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/modules/EmbeddingStoreModule.scala new file mode 100644 index 000000000..bff5d491c --- /dev/null +++ b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/modules/EmbeddingStoreModule.scala @@ -0,0 +1,100 @@ +package com.twitter.representationscorer.modules + +import com.google.inject.Provides +import com.twitter.decider.Decider +import com.twitter.finagle.memcached.{Client => MemcachedClient} +import com.twitter.finagle.mtls.authentication.ServiceIdentifier +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.finagle.thrift.ClientId +import com.twitter.hermit.store.common.ObservedReadableStore +import com.twitter.inject.TwitterModule +import com.twitter.relevance_platform.common.readablestore.ReadableStoreWithTimeout +import com.twitter.representation_manager.migration.LegacyRMS +import com.twitter.representationscorer.DeciderConstants +import com.twitter.simclusters_v2.common.SimClustersEmbedding +import com.twitter.simclusters_v2.stores.SimClustersEmbeddingStore +import com.twitter.simclusters_v2.thriftscala.EmbeddingType +import com.twitter.simclusters_v2.thriftscala.EmbeddingType._ +import com.twitter.simclusters_v2.thriftscala.ModelVersion +import com.twitter.simclusters_v2.thriftscala.ModelVersion._ +import com.twitter.simclusters_v2.thriftscala.SimClustersEmbeddingId +import com.twitter.storehaus.ReadableStore +import com.twitter.util.Timer +import javax.inject.Singleton + +object EmbeddingStoreModule extends TwitterModule { + @Singleton + @Provides + def providesEmbeddingStore( + memCachedClient: MemcachedClient, + serviceIdentifier: ServiceIdentifier, + clientId: ClientId, + timer: Timer, + decider: Decider, + stats: StatsReceiver + ): ReadableStore[SimClustersEmbeddingId, SimClustersEmbedding] = { + val cacheHashKeyPrefix: String = "RMS" + val embeddingStoreClient = new LegacyRMS( + serviceIdentifier, + memCachedClient, + stats, + decider, + clientId, + timer, + cacheHashKeyPrefix + ) + + val underlyingStores: Map[ + (EmbeddingType, ModelVersion), + ReadableStore[SimClustersEmbeddingId, SimClustersEmbedding] + ] = Map( + // Tweet Embeddings + ( + LogFavBasedTweet, + Model20m145k2020) -> embeddingStoreClient.logFavBased20M145K2020TweetEmbeddingStore, + ( + LogFavLongestL2EmbeddingTweet, + Model20m145k2020) -> embeddingStoreClient.logFavBasedLongestL2Tweet20M145K2020EmbeddingStore, + // InterestedIn Embeddings + ( + LogFavBasedUserInterestedInFromAPE, + Model20m145k2020) -> embeddingStoreClient.LogFavBasedInterestedInFromAPE20M145K2020Store, + ( + FavBasedUserInterestedIn, + Model20m145k2020) -> embeddingStoreClient.favBasedUserInterestedIn20M145K2020Store, + // Author Embeddings + ( + FavBasedProducer, + Model20m145k2020) -> embeddingStoreClient.favBasedProducer20M145K2020EmbeddingStore, + // Entity Embeddings + ( + LogFavBasedKgoApeTopic, + Model20m145k2020) -> embeddingStoreClient.logFavBasedApeEntity20M145K2020EmbeddingCachedStore, + (FavTfgTopic, Model20m145k2020) -> embeddingStoreClient.favBasedTfgTopicEmbedding2020Store, + ) + + val simClustersEmbeddingStore: ReadableStore[SimClustersEmbeddingId, SimClustersEmbedding] = { + val underlying: ReadableStore[SimClustersEmbeddingId, SimClustersEmbedding] = + SimClustersEmbeddingStore.buildWithDecider( + underlyingStores = underlyingStores, + decider = decider, + statsReceiver = stats.scope("simClusters_embeddings_store_deciderable") + ) + + val underlyingWithTimeout: ReadableStore[SimClustersEmbeddingId, SimClustersEmbedding] = + new ReadableStoreWithTimeout( + rs = underlying, + decider = decider, + enableTimeoutDeciderKey = DeciderConstants.enableSimClustersEmbeddingStoreTimeouts, + timeoutValueKey = DeciderConstants.simClustersEmbeddingStoreTimeoutValueMillis, + timer = timer, + statsReceiver = stats.scope("simClusters_embedding_store_timeouts") + ) + + ObservedReadableStore( + store = underlyingWithTimeout + )(stats.scope("simClusters_embeddings_store")) + } + simClustersEmbeddingStore + } +} diff --git a/representation-scorer/server/src/main/scala/com/twitter/representationscorer/modules/RMSConfigModule.scala b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/modules/RMSConfigModule.scala new file mode 100644 index 000000000..08ac0cb93 --- /dev/null +++ b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/modules/RMSConfigModule.scala @@ -0,0 +1,63 @@ +package com.twitter.representationscorer.modules + +import com.google.inject.Provides +import com.twitter.conversions.DurationOps._ +import com.twitter.inject.TwitterModule +import com.twitter.representation_manager.config.ClientConfig +import com.twitter.representation_manager.config.EnabledInMemoryCacheParams +import com.twitter.representation_manager.config.InMemoryCacheParams +import com.twitter.simclusters_v2.thriftscala.EmbeddingType +import com.twitter.simclusters_v2.thriftscala.EmbeddingType._ +import com.twitter.simclusters_v2.thriftscala.ModelVersion +import com.twitter.simclusters_v2.thriftscala.ModelVersion._ +import javax.inject.Singleton + +object RMSConfigModule extends TwitterModule { + def getCacheName(embedingType: EmbeddingType, modelVersion: ModelVersion): String = + s"${embedingType.name}_${modelVersion.name}_in_mem_cache" + + @Singleton + @Provides + def providesRMSClientConfig: ClientConfig = { + val cacheParamsMap: Map[ + (EmbeddingType, ModelVersion), + InMemoryCacheParams + ] = Map( + // Tweet Embeddings + (LogFavBasedTweet, Model20m145k2020) -> EnabledInMemoryCacheParams( + ttl = 10.minutes, + maxKeys = 1048575, // 800MB + cacheName = getCacheName(LogFavBasedTweet, Model20m145k2020)), + (LogFavLongestL2EmbeddingTweet, Model20m145k2020) -> EnabledInMemoryCacheParams( + ttl = 5.minute, + maxKeys = 1048575, // 800MB + cacheName = getCacheName(LogFavLongestL2EmbeddingTweet, Model20m145k2020)), + // User - KnownFor Embeddings + (FavBasedProducer, Model20m145k2020) -> EnabledInMemoryCacheParams( + ttl = 1.day, + maxKeys = 500000, // 400MB + cacheName = getCacheName(FavBasedProducer, Model20m145k2020)), + // User - InterestedIn Embeddings + (LogFavBasedUserInterestedInFromAPE, Model20m145k2020) -> EnabledInMemoryCacheParams( + ttl = 6.hours, + maxKeys = 262143, + cacheName = getCacheName(LogFavBasedUserInterestedInFromAPE, Model20m145k2020)), + (FavBasedUserInterestedIn, Model20m145k2020) -> EnabledInMemoryCacheParams( + ttl = 6.hours, + maxKeys = 262143, + cacheName = getCacheName(FavBasedUserInterestedIn, Model20m145k2020)), + // Topic Embeddings + (FavTfgTopic, Model20m145k2020) -> EnabledInMemoryCacheParams( + ttl = 12.hours, + maxKeys = 262143, // 200MB + cacheName = getCacheName(FavTfgTopic, Model20m145k2020)), + (LogFavBasedKgoApeTopic, Model20m145k2020) -> EnabledInMemoryCacheParams( + ttl = 6.hours, + maxKeys = 262143, + cacheName = getCacheName(LogFavBasedKgoApeTopic, Model20m145k2020)), + ) + + new ClientConfig(inMemCacheParamsOverrides = cacheParamsMap) + } + +} diff --git a/representation-scorer/server/src/main/scala/com/twitter/representationscorer/modules/TimerModule.scala b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/modules/TimerModule.scala new file mode 100644 index 000000000..b425d516a --- /dev/null +++ b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/modules/TimerModule.scala @@ -0,0 +1,13 @@ +package com.twitter.representationscorer.modules + +import com.google.inject.Provides +import com.twitter.finagle.util.DefaultTimer +import com.twitter.inject.TwitterModule +import com.twitter.util.Timer +import javax.inject.Singleton + +object TimerModule extends TwitterModule { + @Singleton + @Provides + def providesTimer: Timer = DefaultTimer +} diff --git a/representation-scorer/server/src/main/scala/com/twitter/representationscorer/scorestore/BUILD b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/scorestore/BUILD new file mode 100644 index 000000000..3c259cfc4 --- /dev/null +++ b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/scorestore/BUILD @@ -0,0 +1,19 @@ +scala_library( + compiler_option_sets = ["fatal_warnings"], + platform = "java8", + tags = ["bazel-compatible"], + dependencies = [ + "frigate/frigate-common/src/main/scala/com/twitter/frigate/common/util", + "hermit/hermit-core/src/main/scala/com/twitter/hermit/store/common", + "relevance-platform/src/main/scala/com/twitter/relevance_platform/common/injection", + "representation-manager/client/src/main/scala/com/twitter/representation_manager", + "representation-manager/client/src/main/scala/com/twitter/representation_manager/config", + "representation-scorer/server/src/main/scala/com/twitter/representationscorer/common", + "src/scala/com/twitter/simclusters_v2/score", + "src/scala/com/twitter/topic_recos/common", + "src/scala/com/twitter/topic_recos/stores", + "src/thrift/com/twitter/simclusters_v2:simclusters_v2-thrift-scala", + "src/thrift/com/twitter/topic_recos:topic_recos-thrift-scala", + "stitch/stitch-storehaus", + ], +) diff --git a/representation-scorer/server/src/main/scala/com/twitter/representationscorer/scorestore/ScoreStore.scala b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/scorestore/ScoreStore.scala new file mode 100644 index 000000000..db7cbefa9 --- /dev/null +++ b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/scorestore/ScoreStore.scala @@ -0,0 +1,168 @@ +package com.twitter.representationscorer.scorestore + +import com.twitter.bijection.scrooge.BinaryScalaCodec +import com.twitter.conversions.DurationOps._ +import com.twitter.finagle.memcached.Client +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.hashing.KeyHasher +import com.twitter.hermit.store.common.ObservedCachedReadableStore +import com.twitter.hermit.store.common.ObservedMemcachedReadableStore +import com.twitter.hermit.store.common.ObservedReadableStore +import com.twitter.relevance_platform.common.injection.LZ4Injection +import com.twitter.simclusters_v2.common.SimClustersEmbedding +import com.twitter.simclusters_v2.score.ScoreFacadeStore +import com.twitter.simclusters_v2.score.SimClustersEmbeddingPairScoreStore +import com.twitter.simclusters_v2.thriftscala.EmbeddingType.FavTfgTopic +import com.twitter.simclusters_v2.thriftscala.EmbeddingType.LogFavBasedKgoApeTopic +import com.twitter.simclusters_v2.thriftscala.EmbeddingType.LogFavBasedTweet +import com.twitter.simclusters_v2.thriftscala.ModelVersion.Model20m145kUpdated +import com.twitter.simclusters_v2.thriftscala.Score +import com.twitter.simclusters_v2.thriftscala.ScoreId +import com.twitter.simclusters_v2.thriftscala.ScoringAlgorithm +import com.twitter.simclusters_v2.thriftscala.SimClustersEmbeddingId +import com.twitter.stitch.storehaus.StitchOfReadableStore +import com.twitter.storehaus.ReadableStore +import com.twitter.strato.client.{Client => StratoClient} +import com.twitter.topic_recos.stores.CertoTweetTopicScoresStore +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton() +class ScoreStore @Inject() ( + simClustersEmbeddingStore: ReadableStore[SimClustersEmbeddingId, SimClustersEmbedding], + stratoClient: StratoClient, + representationScorerCacheClient: Client, + stats: StatsReceiver) { + + private val keyHasher = KeyHasher.FNV1A_64 + private val statsReceiver = stats.scope("score_store") + + /** ** Score Store *****/ + private val simClustersEmbeddingCosineSimilarityScoreStore = + ObservedReadableStore( + SimClustersEmbeddingPairScoreStore + .buildCosineSimilarityStore(simClustersEmbeddingStore) + .toThriftStore + )(statsReceiver.scope("simClusters_embedding_cosine_similarity_score_store")) + + private val simClustersEmbeddingDotProductScoreStore = + ObservedReadableStore( + SimClustersEmbeddingPairScoreStore + .buildDotProductStore(simClustersEmbeddingStore) + .toThriftStore + )(statsReceiver.scope("simClusters_embedding_dot_product_score_store")) + + private val simClustersEmbeddingJaccardSimilarityScoreStore = + ObservedReadableStore( + SimClustersEmbeddingPairScoreStore + .buildJaccardSimilarityStore(simClustersEmbeddingStore) + .toThriftStore + )(statsReceiver.scope("simClusters_embedding_jaccard_similarity_score_store")) + + private val simClustersEmbeddingEuclideanDistanceScoreStore = + ObservedReadableStore( + SimClustersEmbeddingPairScoreStore + .buildEuclideanDistanceStore(simClustersEmbeddingStore) + .toThriftStore + )(statsReceiver.scope("simClusters_embedding_euclidean_distance_score_store")) + + private val simClustersEmbeddingManhattanDistanceScoreStore = + ObservedReadableStore( + SimClustersEmbeddingPairScoreStore + .buildManhattanDistanceStore(simClustersEmbeddingStore) + .toThriftStore + )(statsReceiver.scope("simClusters_embedding_manhattan_distance_score_store")) + + private val simClustersEmbeddingLogCosineSimilarityScoreStore = + ObservedReadableStore( + SimClustersEmbeddingPairScoreStore + .buildLogCosineSimilarityStore(simClustersEmbeddingStore) + .toThriftStore + )(statsReceiver.scope("simClusters_embedding_log_cosine_similarity_score_store")) + + private val simClustersEmbeddingExpScaledCosineSimilarityScoreStore = + ObservedReadableStore( + SimClustersEmbeddingPairScoreStore + .buildExpScaledCosineSimilarityStore(simClustersEmbeddingStore) + .toThriftStore + )(statsReceiver.scope("simClusters_embedding_exp_scaled_cosine_similarity_score_store")) + + // Use the default setting + private val topicTweetRankingScoreStore = + TopicTweetRankingScoreStore.buildTopicTweetRankingStore( + FavTfgTopic, + LogFavBasedKgoApeTopic, + LogFavBasedTweet, + Model20m145kUpdated, + consumerEmbeddingMultiplier = 1.0, + producerEmbeddingMultiplier = 1.0 + ) + + private val topicTweetsCortexThresholdStore = TopicTweetsCosineSimilarityAggregateStore( + TopicTweetsCosineSimilarityAggregateStore.DefaultScoreKeys, + statsReceiver.scope("topic_tweets_cortex_threshold_store") + ) + + val topicTweetCertoScoreStore: ObservedCachedReadableStore[ScoreId, Score] = { + val underlyingStore = ObservedReadableStore( + TopicTweetCertoScoreStore(CertoTweetTopicScoresStore.prodStore(stratoClient)) + )(statsReceiver.scope("topic_tweet_certo_score_store")) + + val memcachedStore = ObservedMemcachedReadableStore + .fromCacheClient( + backingStore = underlyingStore, + cacheClient = representationScorerCacheClient, + ttl = 10.minutes + )( + valueInjection = LZ4Injection.compose(BinaryScalaCodec(Score)), + statsReceiver = statsReceiver.scope("topic_tweet_certo_store_memcache"), + keyToString = { k: ScoreId => + s"certocs:${keyHasher.hashKey(k.toString.getBytes)}" + } + ) + + ObservedCachedReadableStore.from[ScoreId, Score]( + memcachedStore, + ttl = 5.minutes, + maxKeys = 1000000, + cacheName = "topic_tweet_certo_store_cache", + windowSize = 10000L + )(statsReceiver.scope("topic_tweet_certo_store_cache")) + } + + val uniformScoringStore: ReadableStore[ScoreId, Score] = + ScoreFacadeStore.buildWithMetrics( + readableStores = Map( + ScoringAlgorithm.PairEmbeddingCosineSimilarity -> + simClustersEmbeddingCosineSimilarityScoreStore, + ScoringAlgorithm.PairEmbeddingDotProduct -> + simClustersEmbeddingDotProductScoreStore, + ScoringAlgorithm.PairEmbeddingJaccardSimilarity -> + simClustersEmbeddingJaccardSimilarityScoreStore, + ScoringAlgorithm.PairEmbeddingEuclideanDistance -> + simClustersEmbeddingEuclideanDistanceScoreStore, + ScoringAlgorithm.PairEmbeddingManhattanDistance -> + simClustersEmbeddingManhattanDistanceScoreStore, + ScoringAlgorithm.PairEmbeddingLogCosineSimilarity -> + simClustersEmbeddingLogCosineSimilarityScoreStore, + ScoringAlgorithm.PairEmbeddingExpScaledCosineSimilarity -> + simClustersEmbeddingExpScaledCosineSimilarityScoreStore, + // Certo normalized cosine score between topic-tweet pairs + ScoringAlgorithm.CertoNormalizedCosineScore + -> topicTweetCertoScoreStore, + // Certo normalized dot-product score between topic-tweet pairs + ScoringAlgorithm.CertoNormalizedDotProductScore + -> topicTweetCertoScoreStore + ), + aggregatedStores = Map( + ScoringAlgorithm.WeightedSumTopicTweetRanking -> + topicTweetRankingScoreStore, + ScoringAlgorithm.CortexTopicTweetLabel -> + topicTweetsCortexThresholdStore, + ), + statsReceiver = stats + ) + + val uniformScoringStoreStitch: ScoreId => com.twitter.stitch.Stitch[Score] = + StitchOfReadableStore(uniformScoringStore) +} diff --git a/representation-scorer/server/src/main/scala/com/twitter/representationscorer/scorestore/TopicTweetCertoScoreStore.scala b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/scorestore/TopicTweetCertoScoreStore.scala new file mode 100644 index 000000000..b6216985f --- /dev/null +++ b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/scorestore/TopicTweetCertoScoreStore.scala @@ -0,0 +1,106 @@ +package com.twitter.representationscorer.scorestore + +import com.twitter.simclusters_v2.common.TweetId +import com.twitter.simclusters_v2.thriftscala.ScoreInternalId.GenericPairScoreId +import com.twitter.simclusters_v2.thriftscala.ScoringAlgorithm.CertoNormalizedDotProductScore +import com.twitter.simclusters_v2.thriftscala.ScoringAlgorithm.CertoNormalizedCosineScore +import com.twitter.simclusters_v2.thriftscala.InternalId +import com.twitter.simclusters_v2.thriftscala.TopicId +import com.twitter.simclusters_v2.thriftscala.{Score => ThriftScore} +import com.twitter.simclusters_v2.thriftscala.{ScoreId => ThriftScoreId} +import com.twitter.storehaus.FutureOps +import com.twitter.storehaus.ReadableStore +import com.twitter.topic_recos.thriftscala.Scores +import com.twitter.topic_recos.thriftscala.TopicToScores +import com.twitter.util.Future + +/** + * Score store to get Certo scores. + * Currently, the store supports two Scoring Algorithms (i.e., two types of Certo scores): + * 1. NormalizedDotProduct + * 2. NormalizedCosine + * Querying with corresponding scoring algorithms results in different Certo scores. + */ +case class TopicTweetCertoScoreStore(certoStratoStore: ReadableStore[TweetId, TopicToScores]) + extends ReadableStore[ThriftScoreId, ThriftScore] { + + override def multiGet[K1 <: ThriftScoreId](ks: Set[K1]): Map[K1, Future[Option[ThriftScore]]] = { + val tweetIds = + ks.map(_.internalId).collect { + case GenericPairScoreId(scoreId) => + ((scoreId.id1, scoreId.id2): @annotation.nowarn( + "msg=may not be exhaustive|max recursion depth")) match { + case (InternalId.TweetId(tweetId), _) => tweetId + case (_, InternalId.TweetId(tweetId)) => tweetId + } + } + + val result = for { + certoScores <- Future.collect(certoStratoStore.multiGet(tweetIds)) + } yield { + ks.map { k => + (k.algorithm, k.internalId) match { + case (CertoNormalizedDotProductScore, GenericPairScoreId(scoreId)) => + (scoreId.id1, scoreId.id2) match { + case (InternalId.TweetId(tweetId), InternalId.TopicId(topicId)) => + ( + k, + extractScore( + tweetId, + topicId, + certoScores, + _.followerL2NormalizedDotProduct8HrHalfLife)) + case (InternalId.TopicId(topicId), InternalId.TweetId(tweetId)) => + ( + k, + extractScore( + tweetId, + topicId, + certoScores, + _.followerL2NormalizedDotProduct8HrHalfLife)) + case _ => (k, None) + } + case (CertoNormalizedCosineScore, GenericPairScoreId(scoreId)) => + (scoreId.id1, scoreId.id2) match { + case (InternalId.TweetId(tweetId), InternalId.TopicId(topicId)) => + ( + k, + extractScore( + tweetId, + topicId, + certoScores, + _.followerL2NormalizedCosineSimilarity8HrHalfLife)) + case (InternalId.TopicId(topicId), InternalId.TweetId(tweetId)) => + ( + k, + extractScore( + tweetId, + topicId, + certoScores, + _.followerL2NormalizedCosineSimilarity8HrHalfLife)) + case _ => (k, None) + } + case _ => (k, None) + } + }.toMap + } + FutureOps.liftValues(ks, result) + } + + /** + * Given tweetToCertoScores, extract certain Certo score between the given tweetId and topicId. + * The Certo score of interest is specified using scoreExtractor. + */ + def extractScore( + tweetId: TweetId, + topicId: TopicId, + tweetToCertoScores: Map[TweetId, Option[TopicToScores]], + scoreExtractor: Scores => Double + ): Option[ThriftScore] = { + tweetToCertoScores.get(tweetId).flatMap { + case Some(topicToScores) => + topicToScores.topicToScores.flatMap(_.get(topicId).map(scoreExtractor).map(ThriftScore(_))) + case _ => Some(ThriftScore(0.0)) + } + } +} diff --git a/representation-scorer/server/src/main/scala/com/twitter/representationscorer/scorestore/TopicTweetRankingScoreStore.scala b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/scorestore/TopicTweetRankingScoreStore.scala new file mode 100644 index 000000000..9ff502fd6 --- /dev/null +++ b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/scorestore/TopicTweetRankingScoreStore.scala @@ -0,0 +1,48 @@ +package com.twitter.representationscorer.scorestore + +import com.twitter.simclusters_v2.score.WeightedSumAggregatedScoreStore +import com.twitter.simclusters_v2.score.WeightedSumAggregatedScoreStore.WeightedSumAggregatedScoreParameter +import com.twitter.simclusters_v2.thriftscala.{EmbeddingType, ModelVersion, ScoringAlgorithm} + +object TopicTweetRankingScoreStore { + val producerEmbeddingScoreMultiplier = 1.0 + val consumerEmbeddingScoreMultiplier = 1.0 + + /** + * Build the scoring store for TopicTweet Ranking based on Default Multipliers. + * If you want to compare the ranking between different multipliers, register a new + * ScoringAlgorithm and let the upstream uses different scoringAlgorithm by params. + */ + def buildTopicTweetRankingStore( + consumerEmbeddingType: EmbeddingType, + producerEmbeddingType: EmbeddingType, + tweetEmbeddingType: EmbeddingType, + modelVersion: ModelVersion, + consumerEmbeddingMultiplier: Double = consumerEmbeddingScoreMultiplier, + producerEmbeddingMultiplier: Double = producerEmbeddingScoreMultiplier + ): WeightedSumAggregatedScoreStore = { + WeightedSumAggregatedScoreStore( + List( + WeightedSumAggregatedScoreParameter( + ScoringAlgorithm.PairEmbeddingCosineSimilarity, + consumerEmbeddingMultiplier, + WeightedSumAggregatedScoreStore.genericPairScoreIdToSimClustersEmbeddingPairScoreId( + consumerEmbeddingType, + tweetEmbeddingType, + modelVersion + ) + ), + WeightedSumAggregatedScoreParameter( + ScoringAlgorithm.PairEmbeddingCosineSimilarity, + producerEmbeddingMultiplier, + WeightedSumAggregatedScoreStore.genericPairScoreIdToSimClustersEmbeddingPairScoreId( + producerEmbeddingType, + tweetEmbeddingType, + modelVersion + ) + ) + ) + ) + } + +} diff --git a/representation-scorer/server/src/main/scala/com/twitter/representationscorer/scorestore/TopicTweetsCosineSimilarityAggregateStore.scala b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/scorestore/TopicTweetsCosineSimilarityAggregateStore.scala new file mode 100644 index 000000000..f835158b8 --- /dev/null +++ b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/scorestore/TopicTweetsCosineSimilarityAggregateStore.scala @@ -0,0 +1,148 @@ +package com.twitter.representationscorer.scorestore + +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.frigate.common.util.StatsUtil +import com.twitter.representationscorer.scorestore.TopicTweetsCosineSimilarityAggregateStore.ScoreKey +import com.twitter.simclusters_v2.common.TweetId +import com.twitter.simclusters_v2.score.AggregatedScoreStore +import com.twitter.simclusters_v2.thriftscala.ScoreInternalId.GenericPairScoreId +import com.twitter.simclusters_v2.thriftscala.ScoringAlgorithm.CortexTopicTweetLabel +import com.twitter.simclusters_v2.thriftscala.{ + EmbeddingType, + InternalId, + ModelVersion, + ScoreInternalId, + ScoringAlgorithm, + SimClustersEmbeddingId, + TopicId, + Score => ThriftScore, + ScoreId => ThriftScoreId, + SimClustersEmbeddingPairScoreId => ThriftSimClustersEmbeddingPairScoreId +} +import com.twitter.storehaus.ReadableStore +import com.twitter.topic_recos.common.Configs.{DefaultModelVersion, MinCosineSimilarityScore} +import com.twitter.topic_recos.common._ +import com.twitter.util.Future + +/** + * Calculates the cosine similarity scores of arbitrary combinations of TopicEmbeddings and + * TweetEmbeddings. + * The class has 2 uses: + * 1. For internal uses. TSP will call this store to fetch the raw scores for (topic, tweet) with + * all available embedding types. We calculate all the scores here, so the caller can do filtering + * & score caching on their side. This will make it possible to DDG different embedding scores. + * + * 2. For external calls from Cortex. We return true (or 1.0) for any given (topic, tweet) if their + * cosine similarity passes the threshold for any of the embedding types. + * The expected input type is + * ScoreId( + * PairEmbeddingCosineSimilarity, + * GenericPairScoreId(TopicId, TweetId) + * ) + */ +case class TopicTweetsCosineSimilarityAggregateStore( + scoreKeys: Seq[ScoreKey], + statsReceiver: StatsReceiver) + extends AggregatedScoreStore { + + def toCortexScore(scoresMap: Map[ScoreKey, Double]): Double = { + val passThreshold = scoresMap.exists { + case (_, score) => score >= MinCosineSimilarityScore + } + if (passThreshold) 1.0 else 0.0 + } + + /** + * To be called by Cortex through Unified Score API ONLY. Calculates all possible (topic, tweet), + * return 1.0 if any of the embedding scores passes the minimum threshold. + * + * Expect a GenericPairScoreId(PairEmbeddingCosineSimilarity, (TopicId, TweetId)) as input + */ + override def get(k: ThriftScoreId): Future[Option[ThriftScore]] = { + StatsUtil.trackOptionStats(statsReceiver) { + (k.algorithm, k.internalId) match { + case (CortexTopicTweetLabel, GenericPairScoreId(genericPairScoreId)) => + (genericPairScoreId.id1, genericPairScoreId.id2) match { + case (InternalId.TopicId(topicId), InternalId.TweetId(tweetId)) => + TopicTweetsCosineSimilarityAggregateStore + .getRawScoresMap(topicId, tweetId, scoreKeys, scoreFacadeStore) + .map { scoresMap => Some(ThriftScore(toCortexScore(scoresMap))) } + case (InternalId.TweetId(tweetId), InternalId.TopicId(topicId)) => + TopicTweetsCosineSimilarityAggregateStore + .getRawScoresMap(topicId, tweetId, scoreKeys, scoreFacadeStore) + .map { scoresMap => Some(ThriftScore(toCortexScore(scoresMap))) } + case _ => + Future.None + // Do not accept other InternalId combinations + } + case _ => + // Do not accept other Id types for now + Future.None + } + } + } +} + +object TopicTweetsCosineSimilarityAggregateStore { + + val TopicEmbeddingTypes: Seq[EmbeddingType] = + Seq( + EmbeddingType.FavTfgTopic, + EmbeddingType.LogFavBasedKgoApeTopic + ) + + // Add the new embedding types if want to test the new Tweet embedding performance. + val TweetEmbeddingTypes: Seq[EmbeddingType] = Seq(EmbeddingType.LogFavBasedTweet) + + val ModelVersions: Seq[ModelVersion] = + Seq(DefaultModelVersion) + + val DefaultScoreKeys: Seq[ScoreKey] = { + for { + modelVersion <- ModelVersions + topicEmbeddingType <- TopicEmbeddingTypes + tweetEmbeddingType <- TweetEmbeddingTypes + } yield { + ScoreKey( + topicEmbeddingType = topicEmbeddingType, + tweetEmbeddingType = tweetEmbeddingType, + modelVersion = modelVersion + ) + } + } + case class ScoreKey( + topicEmbeddingType: EmbeddingType, + tweetEmbeddingType: EmbeddingType, + modelVersion: ModelVersion) + + def getRawScoresMap( + topicId: TopicId, + tweetId: TweetId, + scoreKeys: Seq[ScoreKey], + uniformScoringStore: ReadableStore[ThriftScoreId, ThriftScore] + ): Future[Map[ScoreKey, Double]] = { + val scoresMapFut = scoreKeys.map { key => + val scoreInternalId = ScoreInternalId.SimClustersEmbeddingPairScoreId( + ThriftSimClustersEmbeddingPairScoreId( + buildTopicEmbedding(topicId, key.topicEmbeddingType, key.modelVersion), + SimClustersEmbeddingId( + key.tweetEmbeddingType, + key.modelVersion, + InternalId.TweetId(tweetId)) + )) + val scoreFut = uniformScoringStore + .get( + ThriftScoreId( + algorithm = ScoringAlgorithm.PairEmbeddingCosineSimilarity, // Hard code as cosine sim + internalId = scoreInternalId + )) + key -> scoreFut + }.toMap + + Future + .collect(scoresMapFut).map(_.collect { + case (key, Some(ThriftScore(score))) => + (key, score) + }) + } +} diff --git a/representation-scorer/server/src/main/scala/com/twitter/representationscorer/twistlyfeatures/BUILD b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/twistlyfeatures/BUILD new file mode 100644 index 000000000..1c617e9a0 --- /dev/null +++ b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/twistlyfeatures/BUILD @@ -0,0 +1,20 @@ +scala_library( + compiler_option_sets = ["fatal_warnings"], + platform = "java8", + tags = ["bazel-compatible"], + dependencies = [ + "3rdparty/jvm/com/github/ben-manes/caffeine", + "finatra/inject/inject-core/src/main/scala", + "representation-scorer/server/src/main/scala/com/twitter/representationscorer/common", + "representation-scorer/server/src/main/scala/com/twitter/representationscorer/scorestore", + "representation-scorer/server/src/main/thrift:thrift-scala", + "src/thrift/com/twitter/twistly:twistly-scala", + "stitch/stitch-core", + "stitch/stitch-core:cache", + "strato/config/columns/recommendations/twistly:twistly-strato-client", + "strato/config/columns/recommendations/user-signal-service:user-signal-service-strato-client", + "strato/src/main/scala/com/twitter/strato/client", + "user-signal-service/thrift/src/main/thrift:thrift-scala", + "util/util-core", + ], +) diff --git a/representation-scorer/server/src/main/scala/com/twitter/representationscorer/twistlyfeatures/Engagements.scala b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/twistlyfeatures/Engagements.scala new file mode 100644 index 000000000..2da828ce6 --- /dev/null +++ b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/twistlyfeatures/Engagements.scala @@ -0,0 +1,65 @@ +package com.twitter.representationscorer.twistlyfeatures + +import com.twitter.conversions.DurationOps._ +import com.twitter.util.Duration +import com.twitter.util.Time + +case class Engagements( + favs7d: Seq[UserSignal] = Nil, + retweets7d: Seq[UserSignal] = Nil, + follows30d: Seq[UserSignal] = Nil, + shares7d: Seq[UserSignal] = Nil, + replies7d: Seq[UserSignal] = Nil, + originalTweets7d: Seq[UserSignal] = Nil, + videoPlaybacks7d: Seq[UserSignal] = Nil, + block30d: Seq[UserSignal] = Nil, + mute30d: Seq[UserSignal] = Nil, + report30d: Seq[UserSignal] = Nil, + dontlike30d: Seq[UserSignal] = Nil, + seeFewer30d: Seq[UserSignal] = Nil) { + + import Engagements._ + + private val now = Time.now + private val oneDayAgo = (now - OneDaySpan).inMillis + private val sevenDaysAgo = (now - SevenDaysSpan).inMillis + + // All ids from the signals grouped by type (tweetIds, userIds, etc) + val tweetIds: Seq[Long] = + (favs7d ++ retweets7d ++ shares7d + ++ replies7d ++ originalTweets7d ++ videoPlaybacks7d + ++ report30d ++ dontlike30d ++ seeFewer30d) + .map(_.targetId) + val authorIds: Seq[Long] = (follows30d ++ block30d ++ mute30d).map(_.targetId) + + // Tweet signals + val dontlike7d: Seq[UserSignal] = dontlike30d.filter(_.timestamp > sevenDaysAgo) + val seeFewer7d: Seq[UserSignal] = seeFewer30d.filter(_.timestamp > sevenDaysAgo) + + val favs1d: Seq[UserSignal] = favs7d.filter(_.timestamp > oneDayAgo) + val retweets1d: Seq[UserSignal] = retweets7d.filter(_.timestamp > oneDayAgo) + val shares1d: Seq[UserSignal] = shares7d.filter(_.timestamp > oneDayAgo) + val replies1d: Seq[UserSignal] = replies7d.filter(_.timestamp > oneDayAgo) + val originalTweets1d: Seq[UserSignal] = originalTweets7d.filter(_.timestamp > oneDayAgo) + val videoPlaybacks1d: Seq[UserSignal] = videoPlaybacks7d.filter(_.timestamp > oneDayAgo) + val dontlike1d: Seq[UserSignal] = dontlike7d.filter(_.timestamp > oneDayAgo) + val seeFewer1d: Seq[UserSignal] = seeFewer7d.filter(_.timestamp > oneDayAgo) + + // User signals + val follows7d: Seq[UserSignal] = follows30d.filter(_.timestamp > sevenDaysAgo) + val block7d: Seq[UserSignal] = block30d.filter(_.timestamp > sevenDaysAgo) + val mute7d: Seq[UserSignal] = mute30d.filter(_.timestamp > sevenDaysAgo) + val report7d: Seq[UserSignal] = report30d.filter(_.timestamp > sevenDaysAgo) + + val block1d: Seq[UserSignal] = block7d.filter(_.timestamp > oneDayAgo) + val mute1d: Seq[UserSignal] = mute7d.filter(_.timestamp > oneDayAgo) + val report1d: Seq[UserSignal] = report7d.filter(_.timestamp > oneDayAgo) +} + +object Engagements { + val OneDaySpan: Duration = 1.days + val SevenDaysSpan: Duration = 7.days + val ThirtyDaysSpan: Duration = 30.days +} + +case class UserSignal(targetId: Long, timestamp: Long) diff --git a/representation-scorer/server/src/main/scala/com/twitter/representationscorer/twistlyfeatures/ScoreResult.scala b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/twistlyfeatures/ScoreResult.scala new file mode 100644 index 000000000..71df34a19 --- /dev/null +++ b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/twistlyfeatures/ScoreResult.scala @@ -0,0 +1,3 @@ +package com.twitter.representationscorer.twistlyfeatures + +case class ScoreResult(id: Long, score: Option[Double]) diff --git a/representation-scorer/server/src/main/scala/com/twitter/representationscorer/twistlyfeatures/Scorer.scala b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/twistlyfeatures/Scorer.scala new file mode 100644 index 000000000..731412d0a --- /dev/null +++ b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/twistlyfeatures/Scorer.scala @@ -0,0 +1,474 @@ +package com.twitter.representationscorer.twistlyfeatures + +import com.twitter.finagle.stats.Counter +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.representationscorer.common.TweetId +import com.twitter.representationscorer.common.UserId +import com.twitter.representationscorer.scorestore.ScoreStore +import com.twitter.representationscorer.thriftscala.SimClustersRecentEngagementSimilarities +import com.twitter.simclusters_v2.thriftscala.EmbeddingType +import com.twitter.simclusters_v2.thriftscala.InternalId +import com.twitter.simclusters_v2.thriftscala.ModelVersion +import com.twitter.simclusters_v2.thriftscala.ScoreId +import com.twitter.simclusters_v2.thriftscala.ScoreInternalId +import com.twitter.simclusters_v2.thriftscala.ScoringAlgorithm +import com.twitter.simclusters_v2.thriftscala.SimClustersEmbeddingId +import com.twitter.simclusters_v2.thriftscala.SimClustersEmbeddingPairScoreId +import com.twitter.stitch.Stitch +import javax.inject.Inject + +class Scorer @Inject() ( + fetchEngagementsFromUSS: Long => Stitch[Engagements], + scoreStore: ScoreStore, + stats: StatsReceiver) { + + import Scorer._ + + private val scoreStats = stats.scope("score") + private val scoreCalculationStats = scoreStats.scope("calculation") + private val scoreResultStats = scoreStats.scope("result") + + private val scoresNonEmptyCounter = scoreResultStats.scope("all").counter("nonEmpty") + private val scoresNonZeroCounter = scoreResultStats.scope("all").counter("nonZero") + + private val tweetScoreStats = scoreCalculationStats.scope("tweetScore").stat("latency") + private val userScoreStats = scoreCalculationStats.scope("userScore").stat("latency") + + private val favNonZero = scoreResultStats.scope("favs").counter("nonZero") + private val favNonEmpty = scoreResultStats.scope("favs").counter("nonEmpty") + + private val retweetsNonZero = scoreResultStats.scope("retweets").counter("nonZero") + private val retweetsNonEmpty = scoreResultStats.scope("retweets").counter("nonEmpty") + + private val followsNonZero = scoreResultStats.scope("follows").counter("nonZero") + private val followsNonEmpty = scoreResultStats.scope("follows").counter("nonEmpty") + + private val sharesNonZero = scoreResultStats.scope("shares").counter("nonZero") + private val sharesNonEmpty = scoreResultStats.scope("shares").counter("nonEmpty") + + private val repliesNonZero = scoreResultStats.scope("replies").counter("nonZero") + private val repliesNonEmpty = scoreResultStats.scope("replies").counter("nonEmpty") + + private val originalTweetsNonZero = scoreResultStats.scope("originalTweets").counter("nonZero") + private val originalTweetsNonEmpty = scoreResultStats.scope("originalTweets").counter("nonEmpty") + + private val videoViewsNonZero = scoreResultStats.scope("videoViews").counter("nonZero") + private val videoViewsNonEmpty = scoreResultStats.scope("videoViews").counter("nonEmpty") + + private val blockNonZero = scoreResultStats.scope("block").counter("nonZero") + private val blockNonEmpty = scoreResultStats.scope("block").counter("nonEmpty") + + private val muteNonZero = scoreResultStats.scope("mute").counter("nonZero") + private val muteNonEmpty = scoreResultStats.scope("mute").counter("nonEmpty") + + private val reportNonZero = scoreResultStats.scope("report").counter("nonZero") + private val reportNonEmpty = scoreResultStats.scope("report").counter("nonEmpty") + + private val dontlikeNonZero = scoreResultStats.scope("dontlike").counter("nonZero") + private val dontlikeNonEmpty = scoreResultStats.scope("dontlike").counter("nonEmpty") + + private val seeFewerNonZero = scoreResultStats.scope("seeFewer").counter("nonZero") + private val seeFewerNonEmpty = scoreResultStats.scope("seeFewer").counter("nonEmpty") + + private def getTweetScores( + candidateTweetId: TweetId, + sourceTweetIds: Seq[TweetId] + ): Stitch[Seq[ScoreResult]] = { + val getScoresStitch = Stitch.traverse(sourceTweetIds) { sourceTweetId => + scoreStore + .uniformScoringStoreStitch(getTweetScoreId(sourceTweetId, candidateTweetId)) + .liftNotFoundToOption + .map(score => ScoreResult(sourceTweetId, score.map(_.score))) + } + + Stitch.time(getScoresStitch).flatMap { + case (tryResult, duration) => + tweetScoreStats.add(duration.inMillis) + Stitch.const(tryResult) + } + } + + private def getUserScores( + tweetId: TweetId, + authorIds: Seq[UserId] + ): Stitch[Seq[ScoreResult]] = { + val getScoresStitch = Stitch.traverse(authorIds) { authorId => + scoreStore + .uniformScoringStoreStitch(getAuthorScoreId(authorId, tweetId)) + .liftNotFoundToOption + .map(score => ScoreResult(authorId, score.map(_.score))) + } + + Stitch.time(getScoresStitch).flatMap { + case (tryResult, duration) => + userScoreStats.add(duration.inMillis) + Stitch.const(tryResult) + } + } + + /** + * Get the [[SimClustersRecentEngagementSimilarities]] result containing the similarity + * features for the given userId-TweetId. + */ + def get( + userId: UserId, + tweetId: TweetId + ): Stitch[SimClustersRecentEngagementSimilarities] = { + get(userId, Seq(tweetId)).map(x => x.head) + } + + /** + * Get a list of [[SimClustersRecentEngagementSimilarities]] results containing the similarity + * features for the given tweets of the user Id. + * Guaranteed to be the same number/order as requested. + */ + def get( + userId: UserId, + tweetIds: Seq[TweetId] + ): Stitch[Seq[SimClustersRecentEngagementSimilarities]] = { + fetchEngagementsFromUSS(userId) + .flatMap(engagements => { + // For each tweet received in the request, compute the similarity scores between them + // and the user signals fetched from USS. + Stitch + .join( + Stitch.traverse(tweetIds)(id => getTweetScores(id, engagements.tweetIds)), + Stitch.traverse(tweetIds)(id => getUserScores(id, engagements.authorIds)), + ) + .map { + case (tweetScoresSeq, userScoreSeq) => + // All seq have = size because when scores don't exist, they are returned as Option + (tweetScoresSeq, userScoreSeq).zipped.map { (tweetScores, userScores) => + computeSimilarityScoresPerTweet( + engagements, + tweetScores.groupBy(_.id), + userScores.groupBy(_.id)) + } + } + }) + } + + /** + * + * Computes the [[SimClustersRecentEngagementSimilarities]] + * using the given tweet-tweet and user-tweet scores in TweetScoresMap + * and the user signals in [[Engagements]]. + */ + private def computeSimilarityScoresPerTweet( + engagements: Engagements, + tweetScores: Map[TweetId, Seq[ScoreResult]], + authorScores: Map[UserId, Seq[ScoreResult]] + ): SimClustersRecentEngagementSimilarities = { + val favs7d = engagements.favs7d.view + .flatMap(s => tweetScores.get(s.targetId)) + .flatten.flatMap(_.score) + .force + + val favs1d = engagements.favs1d.view + .flatMap(s => tweetScores.get(s.targetId)) + .flatten.flatMap(_.score) + .force + + val retweets7d = engagements.retweets7d.view + .flatMap(s => tweetScores.get(s.targetId)) + .flatten.flatMap(_.score) + .force + + val retweets1d = engagements.retweets1d.view + .flatMap(s => tweetScores.get(s.targetId)) + .flatten.flatMap(_.score) + .force + + val follows30d = engagements.follows30d.view + .flatMap(s => authorScores.get(s.targetId)) + .flatten.flatMap(_.score) + .force + + val follows7d = engagements.follows7d.view + .flatMap(s => authorScores.get(s.targetId)) + .flatten.flatMap(_.score) + .force + + val shares7d = engagements.shares7d.view + .flatMap(s => tweetScores.get(s.targetId)) + .flatten.flatMap(_.score) + .force + + val shares1d = engagements.shares1d.view + .flatMap(s => tweetScores.get(s.targetId)) + .flatten.flatMap(_.score) + .force + + val replies7d = engagements.replies7d.view + .flatMap(s => tweetScores.get(s.targetId)) + .flatten.flatMap(_.score) + .force + + val replies1d = engagements.replies1d.view + .flatMap(s => tweetScores.get(s.targetId)) + .flatten.flatMap(_.score) + .force + + val originalTweets7d = engagements.originalTweets7d.view + .flatMap(s => tweetScores.get(s.targetId)) + .flatten.flatMap(_.score) + .force + + val originalTweets1d = engagements.originalTweets1d.view + .flatMap(s => tweetScores.get(s.targetId)) + .flatten.flatMap(_.score) + .force + + val videoViews7d = engagements.videoPlaybacks7d.view + .flatMap(s => tweetScores.get(s.targetId)) + .flatten.flatMap(_.score) + .force + + val videoViews1d = engagements.videoPlaybacks1d.view + .flatMap(s => tweetScores.get(s.targetId)) + .flatten.flatMap(_.score) + .force + + val block30d = engagements.block30d.view + .flatMap(s => tweetScores.get(s.targetId)) + .flatten.flatMap(_.score) + .force + + val block7d = engagements.block7d.view + .flatMap(s => tweetScores.get(s.targetId)) + .flatten.flatMap(_.score) + .force + + val block1d = engagements.block1d.view + .flatMap(s => tweetScores.get(s.targetId)) + .flatten.flatMap(_.score) + .force + + val mute30d = engagements.mute30d.view + .flatMap(s => tweetScores.get(s.targetId)) + .flatten.flatMap(_.score) + .force + + val mute7d = engagements.mute7d.view + .flatMap(s => tweetScores.get(s.targetId)) + .flatten.flatMap(_.score) + .force + + val mute1d = engagements.mute1d.view + .flatMap(s => tweetScores.get(s.targetId)) + .flatten.flatMap(_.score) + .force + + val report30d = engagements.report30d.view + .flatMap(s => tweetScores.get(s.targetId)) + .flatten.flatMap(_.score) + .force + + val report7d = engagements.report7d.view + .flatMap(s => tweetScores.get(s.targetId)) + .flatten.flatMap(_.score) + .force + + val report1d = engagements.report1d.view + .flatMap(s => tweetScores.get(s.targetId)) + .flatten.flatMap(_.score) + .force + + val dontlike30d = engagements.dontlike30d.view + .flatMap(s => tweetScores.get(s.targetId)) + .flatten.flatMap(_.score) + .force + + val dontlike7d = engagements.dontlike7d.view + .flatMap(s => tweetScores.get(s.targetId)) + .flatten.flatMap(_.score) + .force + + val dontlike1d = engagements.dontlike1d.view + .flatMap(s => tweetScores.get(s.targetId)) + .flatten.flatMap(_.score) + .force + + val seeFewer30d = engagements.seeFewer30d.view + .flatMap(s => tweetScores.get(s.targetId)) + .flatten.flatMap(_.score) + .force + + val seeFewer7d = engagements.seeFewer7d.view + .flatMap(s => tweetScores.get(s.targetId)) + .flatten.flatMap(_.score) + .force + + val seeFewer1d = engagements.seeFewer1d.view + .flatMap(s => tweetScores.get(s.targetId)) + .flatten.flatMap(_.score) + .force + + val result = SimClustersRecentEngagementSimilarities( + fav1dLast10Max = max(favs1d), + fav1dLast10Avg = avg(favs1d), + fav7dLast10Max = max(favs7d), + fav7dLast10Avg = avg(favs7d), + retweet1dLast10Max = max(retweets1d), + retweet1dLast10Avg = avg(retweets1d), + retweet7dLast10Max = max(retweets7d), + retweet7dLast10Avg = avg(retweets7d), + follow7dLast10Max = max(follows7d), + follow7dLast10Avg = avg(follows7d), + follow30dLast10Max = max(follows30d), + follow30dLast10Avg = avg(follows30d), + share1dLast10Max = max(shares1d), + share1dLast10Avg = avg(shares1d), + share7dLast10Max = max(shares7d), + share7dLast10Avg = avg(shares7d), + reply1dLast10Max = max(replies1d), + reply1dLast10Avg = avg(replies1d), + reply7dLast10Max = max(replies7d), + reply7dLast10Avg = avg(replies7d), + originalTweet1dLast10Max = max(originalTweets1d), + originalTweet1dLast10Avg = avg(originalTweets1d), + originalTweet7dLast10Max = max(originalTweets7d), + originalTweet7dLast10Avg = avg(originalTweets7d), + videoPlayback1dLast10Max = max(videoViews1d), + videoPlayback1dLast10Avg = avg(videoViews1d), + videoPlayback7dLast10Max = max(videoViews7d), + videoPlayback7dLast10Avg = avg(videoViews7d), + block1dLast10Max = max(block1d), + block1dLast10Avg = avg(block1d), + block7dLast10Max = max(block7d), + block7dLast10Avg = avg(block7d), + block30dLast10Max = max(block30d), + block30dLast10Avg = avg(block30d), + mute1dLast10Max = max(mute1d), + mute1dLast10Avg = avg(mute1d), + mute7dLast10Max = max(mute7d), + mute7dLast10Avg = avg(mute7d), + mute30dLast10Max = max(mute30d), + mute30dLast10Avg = avg(mute30d), + report1dLast10Max = max(report1d), + report1dLast10Avg = avg(report1d), + report7dLast10Max = max(report7d), + report7dLast10Avg = avg(report7d), + report30dLast10Max = max(report30d), + report30dLast10Avg = avg(report30d), + dontlike1dLast10Max = max(dontlike1d), + dontlike1dLast10Avg = avg(dontlike1d), + dontlike7dLast10Max = max(dontlike7d), + dontlike7dLast10Avg = avg(dontlike7d), + dontlike30dLast10Max = max(dontlike30d), + dontlike30dLast10Avg = avg(dontlike30d), + seeFewer1dLast10Max = max(seeFewer1d), + seeFewer1dLast10Avg = avg(seeFewer1d), + seeFewer7dLast10Max = max(seeFewer7d), + seeFewer7dLast10Avg = avg(seeFewer7d), + seeFewer30dLast10Max = max(seeFewer30d), + seeFewer30dLast10Avg = avg(seeFewer30d), + ) + trackStats(result) + result + } + + private def trackStats(result: SimClustersRecentEngagementSimilarities): Unit = { + val scores = Seq( + result.fav7dLast10Max, + result.retweet7dLast10Max, + result.follow30dLast10Max, + result.share1dLast10Max, + result.share7dLast10Max, + result.reply7dLast10Max, + result.originalTweet7dLast10Max, + result.videoPlayback7dLast10Max, + result.block30dLast10Max, + result.mute30dLast10Max, + result.report30dLast10Max, + result.dontlike30dLast10Max, + result.seeFewer30dLast10Max + ) + + val nonEmpty = scores.exists(_.isDefined) + val nonZero = scores.exists { case Some(score) if score > 0 => true; case _ => false } + + if (nonEmpty) { + scoresNonEmptyCounter.incr() + } + + if (nonZero) { + scoresNonZeroCounter.incr() + } + + // We use the largest window of a given type of score, + // because the largest window is inclusive of smaller windows. + trackSignalStats(favNonEmpty, favNonZero, result.fav7dLast10Avg) + trackSignalStats(retweetsNonEmpty, retweetsNonZero, result.retweet7dLast10Avg) + trackSignalStats(followsNonEmpty, followsNonZero, result.follow30dLast10Avg) + trackSignalStats(sharesNonEmpty, sharesNonZero, result.share7dLast10Avg) + trackSignalStats(repliesNonEmpty, repliesNonZero, result.reply7dLast10Avg) + trackSignalStats(originalTweetsNonEmpty, originalTweetsNonZero, result.originalTweet7dLast10Avg) + trackSignalStats(videoViewsNonEmpty, videoViewsNonZero, result.videoPlayback7dLast10Avg) + trackSignalStats(blockNonEmpty, blockNonZero, result.block30dLast10Avg) + trackSignalStats(muteNonEmpty, muteNonZero, result.mute30dLast10Avg) + trackSignalStats(reportNonEmpty, reportNonZero, result.report30dLast10Avg) + trackSignalStats(dontlikeNonEmpty, dontlikeNonZero, result.dontlike30dLast10Avg) + trackSignalStats(seeFewerNonEmpty, seeFewerNonZero, result.seeFewer30dLast10Avg) + } + + private def trackSignalStats(nonEmpty: Counter, nonZero: Counter, score: Option[Double]): Unit = { + if (score.nonEmpty) { + nonEmpty.incr() + + if (score.get > 0) + nonZero.incr() + } + } +} + +object Scorer { + def avg(s: Traversable[Double]): Option[Double] = + if (s.isEmpty) None else Some(s.sum / s.size) + def max(s: Traversable[Double]): Option[Double] = + if (s.isEmpty) None else Some(s.foldLeft(0.0D) { (curr, _max) => math.max(curr, _max) }) + + private def getAuthorScoreId( + userId: UserId, + tweetId: TweetId + ) = { + ScoreId( + algorithm = ScoringAlgorithm.PairEmbeddingCosineSimilarity, + internalId = ScoreInternalId.SimClustersEmbeddingPairScoreId( + SimClustersEmbeddingPairScoreId( + SimClustersEmbeddingId( + internalId = InternalId.UserId(userId), + modelVersion = ModelVersion.Model20m145k2020, + embeddingType = EmbeddingType.FavBasedProducer + ), + SimClustersEmbeddingId( + internalId = InternalId.TweetId(tweetId), + modelVersion = ModelVersion.Model20m145k2020, + embeddingType = EmbeddingType.LogFavBasedTweet + ) + )) + ) + } + + private def getTweetScoreId( + sourceTweetId: TweetId, + candidateTweetId: TweetId + ) = { + ScoreId( + algorithm = ScoringAlgorithm.PairEmbeddingCosineSimilarity, + internalId = ScoreInternalId.SimClustersEmbeddingPairScoreId( + SimClustersEmbeddingPairScoreId( + SimClustersEmbeddingId( + internalId = InternalId.TweetId(sourceTweetId), + modelVersion = ModelVersion.Model20m145k2020, + embeddingType = EmbeddingType.LogFavLongestL2EmbeddingTweet + ), + SimClustersEmbeddingId( + internalId = InternalId.TweetId(candidateTweetId), + modelVersion = ModelVersion.Model20m145k2020, + embeddingType = EmbeddingType.LogFavBasedTweet + ) + )) + ) + } +} diff --git a/representation-scorer/server/src/main/scala/com/twitter/representationscorer/twistlyfeatures/UserSignalServiceRecentEngagementsClient.scala b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/twistlyfeatures/UserSignalServiceRecentEngagementsClient.scala new file mode 100644 index 000000000..fb09c1e57 --- /dev/null +++ b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/twistlyfeatures/UserSignalServiceRecentEngagementsClient.scala @@ -0,0 +1,155 @@ +package com.twitter.representationscorer.twistlyfeatures + +import com.twitter.decider.SimpleRecipient +import com.twitter.finagle.stats.Stat +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.representationscorer.common._ +import com.twitter.representationscorer.twistlyfeatures.Engagements._ +import com.twitter.simclusters_v2.common.SimClustersEmbeddingId.LongInternalId +import com.twitter.stitch.Stitch +import com.twitter.strato.generated.client.recommendations.user_signal_service.SignalsClientColumn +import com.twitter.strato.generated.client.recommendations.user_signal_service.SignalsClientColumn.Value +import com.twitter.usersignalservice.thriftscala.BatchSignalRequest +import com.twitter.usersignalservice.thriftscala.SignalRequest +import com.twitter.usersignalservice.thriftscala.SignalType +import com.twitter.util.Time +import scala.collection.mutable.ArrayBuffer +import com.twitter.usersignalservice.thriftscala.ClientIdentifier + +class UserSignalServiceRecentEngagementsClient( + stratoClient: SignalsClientColumn, + decider: RepresentationScorerDecider, + stats: StatsReceiver) { + + import UserSignalServiceRecentEngagementsClient._ + + private val signalStats = stats.scope("user-signal-service", "signal") + private val signalTypeStats: Map[SignalType, Stat] = + SignalType.list.map(s => (s, signalStats.scope(s.name).stat("size"))).toMap + + def get(userId: UserId): Stitch[Engagements] = { + val request = buildRequest(userId) + stratoClient.fetcher.fetch(request).map(_.v).lowerFromOption().map { response => + val now = Time.now + val sevenDaysAgo = now - SevenDaysSpan + val thirtyDaysAgo = now - ThirtyDaysSpan + + Engagements( + favs7d = getUserSignals(response, SignalType.TweetFavorite, sevenDaysAgo), + retweets7d = getUserSignals(response, SignalType.Retweet, sevenDaysAgo), + follows30d = getUserSignals(response, SignalType.AccountFollowWithDelay, thirtyDaysAgo), + shares7d = getUserSignals(response, SignalType.TweetShareV1, sevenDaysAgo), + replies7d = getUserSignals(response, SignalType.Reply, sevenDaysAgo), + originalTweets7d = getUserSignals(response, SignalType.OriginalTweet, sevenDaysAgo), + videoPlaybacks7d = + getUserSignals(response, SignalType.VideoView90dPlayback50V1, sevenDaysAgo), + block30d = getUserSignals(response, SignalType.AccountBlock, thirtyDaysAgo), + mute30d = getUserSignals(response, SignalType.AccountMute, thirtyDaysAgo), + report30d = getUserSignals(response, SignalType.TweetReport, thirtyDaysAgo), + dontlike30d = getUserSignals(response, SignalType.TweetDontLike, thirtyDaysAgo), + seeFewer30d = getUserSignals(response, SignalType.TweetSeeFewer, thirtyDaysAgo), + ) + } + } + + private def getUserSignals( + response: Value, + signalType: SignalType, + earliestValidTimestamp: Time + ): Seq[UserSignal] = { + val signals = response.signalResponse + .getOrElse(signalType, Seq.empty) + .view + .filter(_.timestamp > earliestValidTimestamp.inMillis) + .map(s => s.targetInternalId.collect { case LongInternalId(id) => (id, s.timestamp) }) + .collect { case Some((id, engagedAt)) => UserSignal(id, engagedAt) } + .take(EngagementsToScore) + .force + + signalTypeStats(signalType).add(signals.size) + signals + } + + private def buildRequest(userId: Long) = { + val recipient = Some(SimpleRecipient(userId)) + + // Signals RSX always fetches + val requestSignals = ArrayBuffer( + SignalRequestFav, + SignalRequestRetweet, + SignalRequestFollow + ) + + // Signals under experimentation. We use individual deciders to disable them if necessary. + // If experiments are successful, they will become permanent. + if (decider.isAvailable(FetchSignalShareDeciderKey, recipient)) + requestSignals.append(SignalRequestShare) + + if (decider.isAvailable(FetchSignalReplyDeciderKey, recipient)) + requestSignals.append(SignalRequestReply) + + if (decider.isAvailable(FetchSignalOriginalTweetDeciderKey, recipient)) + requestSignals.append(SignalRequestOriginalTweet) + + if (decider.isAvailable(FetchSignalVideoPlaybackDeciderKey, recipient)) + requestSignals.append(SignalRequestVideoPlayback) + + if (decider.isAvailable(FetchSignalBlockDeciderKey, recipient)) + requestSignals.append(SignalRequestBlock) + + if (decider.isAvailable(FetchSignalMuteDeciderKey, recipient)) + requestSignals.append(SignalRequestMute) + + if (decider.isAvailable(FetchSignalReportDeciderKey, recipient)) + requestSignals.append(SignalRequestReport) + + if (decider.isAvailable(FetchSignalDontlikeDeciderKey, recipient)) + requestSignals.append(SignalRequestDontlike) + + if (decider.isAvailable(FetchSignalSeeFewerDeciderKey, recipient)) + requestSignals.append(SignalRequestSeeFewer) + + BatchSignalRequest(userId, requestSignals, Some(ClientIdentifier.RepresentationScorerHome)) + } +} + +object UserSignalServiceRecentEngagementsClient { + val FetchSignalShareDeciderKey = "representation_scorer_fetch_signal_share" + val FetchSignalReplyDeciderKey = "representation_scorer_fetch_signal_reply" + val FetchSignalOriginalTweetDeciderKey = "representation_scorer_fetch_signal_original_tweet" + val FetchSignalVideoPlaybackDeciderKey = "representation_scorer_fetch_signal_video_playback" + val FetchSignalBlockDeciderKey = "representation_scorer_fetch_signal_block" + val FetchSignalMuteDeciderKey = "representation_scorer_fetch_signal_mute" + val FetchSignalReportDeciderKey = "representation_scorer_fetch_signal_report" + val FetchSignalDontlikeDeciderKey = "representation_scorer_fetch_signal_dont_like" + val FetchSignalSeeFewerDeciderKey = "representation_scorer_fetch_signal_see_fewer" + + val EngagementsToScore = 10 + private val engagementsToScoreOpt: Option[Long] = Some(EngagementsToScore) + + val SignalRequestFav: SignalRequest = + SignalRequest(engagementsToScoreOpt, SignalType.TweetFavorite) + val SignalRequestRetweet: SignalRequest = SignalRequest(engagementsToScoreOpt, SignalType.Retweet) + val SignalRequestFollow: SignalRequest = + SignalRequest(engagementsToScoreOpt, SignalType.AccountFollowWithDelay) + // New experimental signals + val SignalRequestShare: SignalRequest = + SignalRequest(engagementsToScoreOpt, SignalType.TweetShareV1) + val SignalRequestReply: SignalRequest = SignalRequest(engagementsToScoreOpt, SignalType.Reply) + val SignalRequestOriginalTweet: SignalRequest = + SignalRequest(engagementsToScoreOpt, SignalType.OriginalTweet) + val SignalRequestVideoPlayback: SignalRequest = + SignalRequest(engagementsToScoreOpt, SignalType.VideoView90dPlayback50V1) + + // Negative signals + val SignalRequestBlock: SignalRequest = + SignalRequest(engagementsToScoreOpt, SignalType.AccountBlock) + val SignalRequestMute: SignalRequest = + SignalRequest(engagementsToScoreOpt, SignalType.AccountMute) + val SignalRequestReport: SignalRequest = + SignalRequest(engagementsToScoreOpt, SignalType.TweetReport) + val SignalRequestDontlike: SignalRequest = + SignalRequest(engagementsToScoreOpt, SignalType.TweetDontLike) + val SignalRequestSeeFewer: SignalRequest = + SignalRequest(engagementsToScoreOpt, SignalType.TweetSeeFewer) +} diff --git a/representation-scorer/server/src/main/scala/com/twitter/representationscorer/twistlyfeatures/UserSignalServiceRecentEngagementsClientModule.scala b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/twistlyfeatures/UserSignalServiceRecentEngagementsClientModule.scala new file mode 100644 index 000000000..ee9f61df4 --- /dev/null +++ b/representation-scorer/server/src/main/scala/com/twitter/representationscorer/twistlyfeatures/UserSignalServiceRecentEngagementsClientModule.scala @@ -0,0 +1,57 @@ +package com.twitter.representationscorer.twistlyfeatures + +import com.github.benmanes.caffeine.cache.Caffeine +import com.twitter.stitch.cache.EvictingCache +import com.google.inject.Provides +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.inject.TwitterModule +import com.twitter.representationscorer.common.RepresentationScorerDecider +import com.twitter.stitch.Stitch +import com.twitter.stitch.cache.ConcurrentMapCache +import com.twitter.stitch.cache.MemoizeQuery +import com.twitter.strato.client.Client +import com.twitter.strato.generated.client.recommendations.user_signal_service.SignalsClientColumn +import java.util.concurrent.ConcurrentMap +import java.util.concurrent.TimeUnit +import javax.inject.Singleton + +object UserSignalServiceRecentEngagementsClientModule extends TwitterModule { + + @Singleton + @Provides + def provide( + client: Client, + decider: RepresentationScorerDecider, + statsReceiver: StatsReceiver + ): Long => Stitch[Engagements] = { + val stratoClient = new SignalsClientColumn(client) + + /* + This cache holds a users recent engagements for a short period of time, such that batched requests + for multiple (userid, tweetid) pairs don't all need to fetch them. + + [1] Caffeine cache keys/values must be objects, so we cannot use the `Long` primitive directly. + The boxed java.lang.Long works as a key, since it is an object. In most situations the compiler + can see where auto(un)boxing can occur. However, here we seem to need some wrapper functions + with explicit types to allow the boxing to happen. + */ + val mapCache: ConcurrentMap[java.lang.Long, Stitch[Engagements]] = + Caffeine + .newBuilder() + .expireAfterWrite(5, TimeUnit.SECONDS) + .maximumSize( + 1000 // We estimate 5M unique users in a 5m period - with 2k RSX instances, assume that one will see < 1k in a 5s period + ) + .build[java.lang.Long, Stitch[Engagements]] + .asMap + + statsReceiver.provideGauge("ussRecentEngagementsClient", "cache_size") { mapCache.size.toFloat } + + val engagementsClient = + new UserSignalServiceRecentEngagementsClient(stratoClient, decider, statsReceiver) + + val f = (l: java.lang.Long) => engagementsClient.get(l) // See note [1] above + val cachedCall = MemoizeQuery(f, EvictingCache.lazily(new ConcurrentMapCache(mapCache))) + (l: Long) => cachedCall(l) // see note [1] above + } +} diff --git a/representation-scorer/server/src/main/thrift/BUILD b/representation-scorer/server/src/main/thrift/BUILD new file mode 100644 index 000000000..f7ea37675 --- /dev/null +++ b/representation-scorer/server/src/main/thrift/BUILD @@ -0,0 +1,20 @@ +create_thrift_libraries( + base_name = "thrift", + sources = [ + "com/twitter/representationscorer/service.thrift", + ], + platform = "java8", + tags = [ + "bazel-compatible", + ], + dependency_roots = [ + "src/thrift/com/twitter/simclusters_v2:simclusters_v2-thrift", + ], + generate_languages = [ + "java", + "scala", + "strato", + ], + provides_java_name = "representationscorer-service-thrift-java", + provides_scala_name = "representationscorer-service-thrift-scala", +) diff --git a/representation-scorer/server/src/main/thrift/com/twitter/representationscorer/service.thrift b/representation-scorer/server/src/main/thrift/com/twitter/representationscorer/service.thrift new file mode 100644 index 000000000..0e2f23a31 --- /dev/null +++ b/representation-scorer/server/src/main/thrift/com/twitter/representationscorer/service.thrift @@ -0,0 +1,106 @@ +namespace java com.twitter.representationscorer.thriftjava +#@namespace scala com.twitter.representationscorer.thriftscala +#@namespace strato com.twitter.representationscorer + +include "com/twitter/simclusters_v2/identifier.thrift" +include "com/twitter/simclusters_v2/online_store.thrift" +include "com/twitter/simclusters_v2/score.thrift" + +struct SimClustersRecentEngagementSimilarities { + // All scores computed using cosine similarity + // 1 - 1000 Positive Signals + 1: optional double fav1dLast10Max // max score from last 10 faves in the last 1 day + 2: optional double fav1dLast10Avg // avg score from last 10 faves in the last 1 day + 3: optional double fav7dLast10Max // max score from last 10 faves in the last 7 days + 4: optional double fav7dLast10Avg // avg score from last 10 faves in the last 7 days + 5: optional double retweet1dLast10Max // max score from last 10 retweets in the last 1 days + 6: optional double retweet1dLast10Avg // avg score from last 10 retweets in the last 1 days + 7: optional double retweet7dLast10Max // max score from last 10 retweets in the last 7 days + 8: optional double retweet7dLast10Avg // avg score from last 10 retweets in the last 7 days + 9: optional double follow7dLast10Max // max score from the last 10 follows in the last 7 days + 10: optional double follow7dLast10Avg // avg score from the last 10 follows in the last 7 days + 11: optional double follow30dLast10Max // max score from the last 10 follows in the last 30 days + 12: optional double follow30dLast10Avg // avg score from the last 10 follows in the last 30 days + 13: optional double share1dLast10Max // max score from last 10 shares in the last 1 day + 14: optional double share1dLast10Avg // avg score from last 10 shares in the last 1 day + 15: optional double share7dLast10Max // max score from last 10 shares in the last 7 days + 16: optional double share7dLast10Avg // avg score from last 10 shares in the last 7 days + 17: optional double reply1dLast10Max // max score from last 10 replies in the last 1 day + 18: optional double reply1dLast10Avg // avg score from last 10 replies in the last 1 day + 19: optional double reply7dLast10Max // max score from last 10 replies in the last 7 days + 20: optional double reply7dLast10Avg // avg score from last 10 replies in the last 7 days + 21: optional double originalTweet1dLast10Max // max score from last 10 original tweets in the last 1 day + 22: optional double originalTweet1dLast10Avg // avg score from last 10 original tweets in the last 1 day + 23: optional double originalTweet7dLast10Max // max score from last 10 original tweets in the last 7 days + 24: optional double originalTweet7dLast10Avg // avg score from last 10 original tweets in the last 7 days + 25: optional double videoPlayback1dLast10Max // max score from last 10 video playback50 in the last 1 day + 26: optional double videoPlayback1dLast10Avg // avg score from last 10 video playback50 in the last 1 day + 27: optional double videoPlayback7dLast10Max // max score from last 10 video playback50 in the last 7 days + 28: optional double videoPlayback7dLast10Avg // avg score from last 10 video playback50 in the last 7 days + + // 1001 - 2000 Implicit Signals + + // 2001 - 3000 Negative Signals + // Block Series + 2001: optional double block1dLast10Avg + 2002: optional double block1dLast10Max + 2003: optional double block7dLast10Avg + 2004: optional double block7dLast10Max + 2005: optional double block30dLast10Avg + 2006: optional double block30dLast10Max + // Mute Series + 2101: optional double mute1dLast10Avg + 2102: optional double mute1dLast10Max + 2103: optional double mute7dLast10Avg + 2104: optional double mute7dLast10Max + 2105: optional double mute30dLast10Avg + 2106: optional double mute30dLast10Max + // Report Series + 2201: optional double report1dLast10Avg + 2202: optional double report1dLast10Max + 2203: optional double report7dLast10Avg + 2204: optional double report7dLast10Max + 2205: optional double report30dLast10Avg + 2206: optional double report30dLast10Max + // Dontlike + 2301: optional double dontlike1dLast10Avg + 2302: optional double dontlike1dLast10Max + 2303: optional double dontlike7dLast10Avg + 2304: optional double dontlike7dLast10Max + 2305: optional double dontlike30dLast10Avg + 2306: optional double dontlike30dLast10Max + // SeeFewer + 2401: optional double seeFewer1dLast10Avg + 2402: optional double seeFewer1dLast10Max + 2403: optional double seeFewer7dLast10Avg + 2404: optional double seeFewer7dLast10Max + 2405: optional double seeFewer30dLast10Avg + 2406: optional double seeFewer30dLast10Max +}(persisted='true', hasPersonalData = 'true') + +/* + * List score API + */ +struct ListScoreId { + 1: required score.ScoringAlgorithm algorithm + 2: required online_store.ModelVersion modelVersion + 3: required identifier.EmbeddingType targetEmbeddingType + 4: required identifier.InternalId targetId + 5: required identifier.EmbeddingType candidateEmbeddingType + 6: required list candidateIds +}(hasPersonalData = 'true') + +struct ScoreResult { + // This api does not communicate why a score is missing. For example, it may be unavailable + // because the referenced entities do not exist (e.g. the embedding was not found) or because + // timeouts prevented us from calculating it. + 1: optional double score +} + +struct ListScoreResponse { + 1: required list scores // Guaranteed to be the same number/order as requested +} + +struct RecentEngagementSimilaritiesResponse { + 1: required list results // Guaranteed to be the same number/order as requested +}