Delete simclusters-ann directory

This commit is contained in:
dogemanttv 2024-01-10 17:08:38 -06:00 committed by GitHub
parent d52379152a
commit 44886875b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 0 additions and 2022 deletions

View File

@ -1 +0,0 @@
# This prevents SQ query from grabbing //:all since it traverses up once to find a BUILD

View File

@ -1,99 +0,0 @@
# SimClusters ANN
SimClusters ANN is a service that returns tweet candidate recommendations given a SimClusters embedding. The service implements tweet recommendations based on the Approximate Cosine Similarity algorithm.
The cosine similarity between two Tweet SimClusters Embedding represents the relevance level of two tweets in SimCluster space. The traditional algorithm for calculating cosine similarity is expensive and hard to support by the existing infrastructure. Therefore, the Approximate Cosine Similarity algorithm is introduced to save response time by reducing I/O operations.
## Background
SimClusters V2 runtime infra introduces the SimClusters and its online and offline approaches. A heron job builds the mapping between SimClusters and Tweets. The job saves top 400 Tweets for a SimClusters and top 100 SimClusters for a Tweet. Favorite score and follow score are two types of tweet score. In the document, the top 100 SimClusters based on the favorite score for a Tweet stands for the Tweet SimClusters Embedding.
The cosine similarity between two Tweet SimClusters Embedding presents the relevant level of two tweets in SimCluster space. The score varies from 0 to 1. The high cosine similarity score(>= 0.7 in Prod) means that the users who like two tweets share the same SimClusters.
SimClusters from the Linear Algebra Perspective discussed the difference between the dot-product and cosine similarity in SimCluster space. We believe the cosine similarity approach is better because it avoids the bias of tweet popularity.
However, calculating the cosine similarity between two Tweets is pretty expensive in Tweet candidate generation. In TWISTLY, we scan at most 15,000 (6 source tweets * 25 clusters * 100 tweets per clusters) tweet candidates for every Home Timeline request. The traditional algorithm needs to make API calls to fetch 15,000 tweet SimCluster embeddings. Consider that we need to process over 6,000 RPS, its hard to support by the existing infrastructure.
## SimClusters Approximate Cosine Similarity Core Algorithm
1. Provide a source SimCluster Embedding *SV*, *SV = [(SC1, Score), (SC2, Score), (SC3, Score) …]*
2. Fetch top *M* tweets for each Top *N* SimClusters based on SV. In Prod, *M = 400*, *N = 50*. Tweets may appear in multiple SimClusters.
| | | | |
|---|---|---|---|
| SC1 | T1:Score | T2: Score | ... |
| SC2 | T3: Score | T4: Score | ... |
3. Based on the previous table, generate an *(M x N) x N* Matrix *R*. The *R* represents the approximate SimCluster embeddings for *MxN* tweets. The embedding only contains top *N* SimClusters from *SV*. Only top *M* tweets from each SimCluster have the score. Others are 0.
| | SC1 | SC2 | ... |
|---|---|---|---|
| T1 | Score | 0 | ... |
| T2 | Score | 0 | ... |
| T3 | 0 | Score | ... |
4. Compute the dot product between source vector and the approximate vectors for each tweet. (Calculate *R • SV^T*). Take top *X* tweets. In Prod, *X = 200*
5. Fetch *X* tweet SimClusters Embedding, Calculate Cosine Similarity between *X* tweets and *SV*, Return top *Y* above a certain threshold *Z*.
Approximate Cosine Similarity is an approximate algorithm. Instead of fetching *M * N* tweets embedding, it only fetches *X* tweets embedding. In prod, *X / M * N * 100% = 6%*. Based on the metrics during TWISTLY development, most of the response time is consumed by I/O operation. The Approximate Cosine Similarity is a good approach to save a large amount of response time.
The idea of the approximate algorithm is based on the assumption that the higher dot-product between source tweets SimCluster embedding and candidate tweets limited SimCluster Embedding, the possibility that these two tweets are relevant is higher. Additional Cosine Similarity filter is to guarantee that the results are not affected by popularity bias.
Adjusting the M, N, X, Y, Z is able to balance the precision and recall for different products. The implementation of approximate cosine similarity is used by TWISTLY, Interest-based tweet recommendation, Similar Tweet in RUX, and Author based recommendation. This algorithm is also suitable for future user or entity recommendation based on SimClusters Embedding.
# -------------------------------
# Build and Test
# -------------------------------
Compile the service
$ ./bazel build simclusters-ann/server:bin
Unit tests
$ ./bazel test simclusters-ann/server:bin
# -------------------------------
# Deploy
# -------------------------------
## Prerequisite for devel deployments
First of all, you need to generate Service to Service certificates for use while developing locally. This only needs to be done ONCE:
To add cert files to Aurora (if you want to deploy to DEVEL):
```
$ developer-cert-util --env devel --job simclusters-ann
```
## Deploying to devel/staging from a local build
Reference -
$ ./simclusters-ann/bin/deploy.sh --help
Use the script to build the service in your local branch, upload it to packer and deploy in devel aurora:
$ ./simclusters-ann/bin/deploy.sh atla $USER devel simclusters-ann
You can also deploy to staging with this script. E.g. to deploy to instance 1:
$ ./simclusters-ann/bin/deploy.sh atla simclusters-ann staging simclusters-ann <instance-number>
## Deploying to production
Production deploys should be managed by Workflows.
_Do not_ deploy to production unless it is an emergency and you have approval from oncall.
##### It is not recommended to deploy from Command Lines into production environments, unless 1) you're testing a small change in Canary shard [0,9]. 2) Tt is an absolute emergency. Be sure to make oncalls aware of the changes you're deploying.
$ ./simclusters-ann/bin/deploy.sh atla simclusters-ann prod simclusters-ann <instance-number>
In the case of multiple instances,
$ ./simclusters-ann/bin/deploy.sh atla simclusters-ann prod simclusters-ann <instance-number-start>-<instance-number-end>
## Checking Deployed Version and Rolling Back
Wherever possible, roll back using Workflows by finding an earlier good version and clicking the "rollback" button in the UI. This is the safest and least error-prone method.

View File

@ -1,23 +0,0 @@
jvm_binary(
name = "bin",
basename = "simclusters-ann",
main = "com.twitter.simclustersann.SimClustersAnnServerMain",
runtime_platform = "java11",
tags = ["bazel-compatible"],
dependencies = [
"finagle/finagle-zipkin-scribe/src/main/scala",
"finatra/inject/inject-logback/src/main/scala",
"loglens/loglens-logback/src/main/scala/com/twitter/loglens/logback",
"simclusters-ann/server/src/main/scala/com/twitter/simclustersann",
"twitter-server-internal/src/main/scala",
"twitter-server/logback-classic/src/main/scala",
],
)
# Aurora Workflows build phase convention requires a jvm_app named with ${project-name}-app
jvm_app(
name = "simclusters-ann-app",
archive = "zip",
binary = ":bin",
tags = ["bazel-compatible"],
)

View File

@ -1,7 +0,0 @@
resources(
sources = [
"*.xml",
"config/*.yml",
],
tags = ["bazel-compatible"],
)

View File

@ -1,95 +0,0 @@
# SimClusters embedding store enable / disable decider values
# ---------- Dark Traffic Proxy ----------
dark_traffic_filter:
comment: Proportion of the requests that are forwarded as dark traffic to the proxy
default_availability: 0
# Tweet embeddings
enable_LogFavBasedTweet_Model20m145k2020:
comment: "Enable the read traffic to (embeddingType, modelVersion) from 0% to 100%. 0 means return EMPTY for all requests."
default_availability: 10000
enable_LogFavLongestL2EmbeddingTweet_Model20m145k2020:
comment: "Enable the read traffic to (embeddingType, modelVersion) from 0% to 100%. 0 means return EMPTY for all requests."
default_availability: 10000
# Entity embeddings
enable_FavTfgTopic_Model20m145k2020:
comment: "Enable the read traffic to (embeddingType, modelVersion) from 0% to 100%. 0 means return EMPTY for all requests."
default_availability: 10000
enable_LogFavBasedKgoApeTopic_Model20m145k2020:
comment: "Enable the read traffic to (embeddingType, modelVersion) from 0% to 100%. 0 means return EMPTY for all requests."
default_availability: 10000
# KnownFor embeddings
enable_FavBasedProducer_Model20m145k2020:
comment: "Enable the read traffic to (embeddingType, modelVersion) from 0% to 100%. 0 means return EMPTY for all requests."
default_availability: 10000
enable_FollowBasedProducer_Model20m145k2020:
comment: "Enable the read traffic to (embeddingType, modelVersion) from 0% to 100%. 0 means return EMPTY for all requests."
default_availability: 10000
enable_RelaxedAggregatableLogFavBasedProducer_Model20m145k2020:
comment: "Enable the read traffic to (embeddingType, modelVersion) from 0% to 100%. 0 means return EMPTY for all requests."
default_availability: 10000
# InterestedIn embeddings
enable_LogFavBasedUserInterestedInFromAPE_Model20m145k2020:
comment: "Enable the read traffic to (embeddingType, modelVersion) from 0% to 100%. 0 means return EMPTY for all requests."
default_availability: 10000
enable_FollowBasedUserInterestedInFromAPE_Model20m145k2020:
comment: "Enable the read traffic to (embeddingType, modelVersion) from 0% to 100%. 0 means return EMPTY for all requests."
default_availability: 10000
enable_FavBasedUserInterestedIn_Model20m145k2020:
comment: "Enable the read traffic to (embeddingType, modelVersion) from 0% to 100%. 0 means return EMPTY for all requests."
default_availability: 10000
enable_FollowBasedUserInterestedIn_Model20m145k2020:
comment: "Enable the read traffic to (embeddingType, modelVersion) from 0% to 100%. 0 means return EMPTY for all requests."
default_availability: 10000
enable_LogFavBasedUserInterestedIn_Model20m145k2020:
comment: "Enable the read traffic to (embeddingType, modelVersion) from 0% to 100%. 0 means return EMPTY for all requests."
default_availability: 10000
enable_FilteredUserInterestedIn_Model20m145k2020:
comment: "Enable the read traffic to (embeddingType, modelVersion) from 0% to 100%. 0 means return EMPTY for all requests."
default_availability: 10000
enable_UnfilteredUserInterestedIn_Model20m145k2020:
comment: "Enable the read traffic to (embeddingType, modelVersion) from 0% to 100%. 0 means return EMPTY for all requests."
default_availability: 10000
enable_LogFavBasedUserInterestedMaxpoolingAddressBookFromIIAPE_Model20m145k2020:
comment: "Enable the read traffic to (embeddingType, modelVersion) from 0% to 100%. 0 means return EMPTY for all requests."
default_availability: 10000
enable_LogFavBasedUserInterestedAverageAddressBookFromIIAPE_Model20m145k2020:
comment: "Enable the read traffic to (embeddingType, modelVersion) from 0% to 100%. 0 means return EMPTY for all requests."
default_availability: 10000
enable_LogFavBasedUserInterestedBooktypeMaxpoolingAddressBookFromIIAPE_Model20m145k2020:
comment: "Enable the read traffic to (embeddingType, modelVersion) from 0% to 100%. 0 means return EMPTY for all requests."
default_availability: 10000
enable_LogFavBasedUserInterestedLargestDimMaxpoolingAddressBookFromIIAPE_Model20m145k2020:
comment: "Enable the read traffic to (embeddingType, modelVersion) from 0% to 100%. 0 means return EMPTY for all requests."
default_availability: 10000
enable_LogFavBasedUserInterestedLouvainMaxpoolingAddressBookFromIIAPE_Model20m145k2020:
comment: "Enable the read traffic to (embeddingType, modelVersion) from 0% to 100%. 0 means return EMPTY for all requests."
default_availability: 10000
enable_LogFavBasedUserInterestedConnectedMaxpoolingAddressBookFromIIAPE_Model20m145k2020:
comment: "Enable the read traffic to (embeddingType, modelVersion) from 0% to 100%. 0 means return EMPTY for all requests."
default_availability: 10000
enable_UserNextInterestedIn_Model20m145k2020:
comment: "Enable the read traffic to (embeddingType, modelVersion) from 0% to 100%. 0 means return EMPTY for all requests."
default_availability: 10000

View File

@ -1,167 +0,0 @@
<configuration>
<shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
<!-- ===================================================== -->
<!-- Service Config -->
<!-- ===================================================== -->
<property name="DEFAULT_SERVICE_PATTERN"
value="%-16X{traceId} %-12X{clientId:--} %-16X{method} %-25logger{0} %msg"/>
<property name="DEFAULT_ACCESS_PATTERN"
value="%msg"/>
<!-- ===================================================== -->
<!-- Common Config -->
<!-- ===================================================== -->
<!-- JUL/JDK14 to Logback bridge -->
<contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator">
<resetJUL>true</resetJUL>
</contextListener>
<!-- ====================================================================================== -->
<!-- NOTE: The following appenders use a simple TimeBasedRollingPolicy configuration. -->
<!-- You may want to consider using a more advanced SizeAndTimeBasedRollingPolicy. -->
<!-- See: https://logback.qos.ch/manual/appenders.html#SizeAndTimeBasedRollingPolicy -->
<!-- ====================================================================================== -->
<!-- Service Log (rollover daily, keep maximum of 21 days of gzip compressed logs) -->
<appender name="SERVICE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.service.output}</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- daily rollover -->
<fileNamePattern>${log.service.output}.%d.gz</fileNamePattern>
<!-- the maximum total size of all the log files -->
<totalSizeCap>3GB</totalSizeCap>
<!-- keep maximum 21 days' worth of history -->
<maxHistory>21</maxHistory>
<cleanHistoryOnStart>true</cleanHistoryOnStart>
</rollingPolicy>
<encoder>
<pattern>%date %.-3level ${DEFAULT_SERVICE_PATTERN}%n</pattern>
</encoder>
</appender>
<!-- Access Log (rollover daily, keep maximum of 21 days of gzip compressed logs) -->
<appender name="ACCESS" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.access.output}</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- daily rollover -->
<fileNamePattern>${log.access.output}.%d.gz</fileNamePattern>
<!-- the maximum total size of all the log files -->
<totalSizeCap>100MB</totalSizeCap>
<!-- keep maximum 7 days' worth of history -->
<maxHistory>7</maxHistory>
<cleanHistoryOnStart>true</cleanHistoryOnStart>
</rollingPolicy>
<encoder>
<pattern>${DEFAULT_ACCESS_PATTERN}%n</pattern>
</encoder>
</appender>
<!--LogLens -->
<appender name="LOGLENS" class="com.twitter.loglens.logback.LoglensAppender">
<mdcAdditionalContext>true</mdcAdditionalContext>
<category>${log.lens.category}</category>
<index>${log.lens.index}</index>
<tag>${log.lens.tag}/service</tag>
<encoder>
<pattern>%msg</pattern>
</encoder>
</appender>
<!-- LogLens Access -->
<appender name="LOGLENS-ACCESS" class="com.twitter.loglens.logback.LoglensAppender">
<mdcAdditionalContext>true</mdcAdditionalContext>
<category>${log.lens.category}</category>
<index>${log.lens.index}</index>
<tag>${log.lens.tag}/access</tag>
<encoder>
<pattern>%msg</pattern>
</encoder>
</appender>
<!-- Pipeline Execution Logs -->
<appender name="ALLOW-LISTED-PIPELINE-EXECUTIONS" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>allow_listed_pipeline_executions.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- daily rollover -->
<fileNamePattern>allow_listed_pipeline_executions.log.%d.gz</fileNamePattern>
<!-- the maximum total size of all the log files -->
<totalSizeCap>100MB</totalSizeCap>
<!-- keep maximum 7 days' worth of history -->
<maxHistory>7</maxHistory>
<cleanHistoryOnStart>true</cleanHistoryOnStart>
</rollingPolicy>
<encoder>
<pattern>%date %.-3level ${DEFAULT_SERVICE_PATTERN}%n</pattern>
</encoder>
</appender>
<!-- ===================================================== -->
<!-- Primary Async Appenders -->
<!-- ===================================================== -->
<property name="async_queue_size" value="${queue.size:-50000}"/>
<property name="async_max_flush_time" value="${max.flush.time:-0}"/>
<appender name="ASYNC-SERVICE" class="com.twitter.inject.logback.AsyncAppender">
<queueSize>${async_queue_size}</queueSize>
<maxFlushTime>${async_max_flush_time}</maxFlushTime>
<appender-ref ref="SERVICE"/>
</appender>
<appender name="ASYNC-ACCESS" class="com.twitter.inject.logback.AsyncAppender">
<queueSize>${async_queue_size}</queueSize>
<maxFlushTime>${async_max_flush_time}</maxFlushTime>
<appender-ref ref="ACCESS"/>
</appender>
<appender name="ASYNC-ALLOW-LISTED-PIPELINE-EXECUTIONS" class="com.twitter.inject.logback.AsyncAppender">
<queueSize>${async_queue_size}</queueSize>
<maxFlushTime>${async_max_flush_time}</maxFlushTime>
<appender-ref ref="ALLOW-LISTED-PIPELINE-EXECUTIONS"/>
</appender>
<appender name="ASYNC-LOGLENS" class="com.twitter.inject.logback.AsyncAppender">
<queueSize>${async_queue_size}</queueSize>
<maxFlushTime>${async_max_flush_time}</maxFlushTime>
<appender-ref ref="LOGLENS"/>
</appender>
<appender name="ASYNC-LOGLENS-ACCESS" class="com.twitter.inject.logback.AsyncAppender">
<queueSize>${async_queue_size}</queueSize>
<maxFlushTime>${async_max_flush_time}</maxFlushTime>
<appender-ref ref="LOGLENS-ACCESS"/>
</appender>
<!-- ===================================================== -->
<!-- Package Config -->
<!-- ===================================================== -->
<!-- Per-Package Config -->
<logger name="com.twitter" level="INHERITED"/>
<logger name="com.twitter.wilyns" level="INHERITED"/>
<logger name="com.twitter.configbus.client.file" level="INHERITED"/>
<logger name="com.twitter.finagle.mux" level="INHERITED"/>
<logger name="com.twitter.finagle.serverset2" level="INHERITED"/>
<logger name="com.twitter.logging.ScribeHandler" level="INHERITED"/>
<logger name="com.twitter.zookeeper.client.internal" level="INHERITED"/>
<!-- Disable deadline exceeded logs by default. This can be overriden dynamically in the admin panel of individual instances. -->
<logger name="com.twitter.relevance_platform.common.exceptions.DeadlineExceededExceptionMapper" level="OFF"/>
<!-- Root Config -->
<!-- For all logs except access logs, disable logging below log_level level by default. This can be overriden in the per-package loggers, and dynamically in the admin panel of individual instances. -->
<root level="${log_level:-INFO}">
<appender-ref ref="ASYNC-SERVICE"/>
<appender-ref ref="ASYNC-LOGLENS"/>
</root>
<!-- Access Logging -->
<!-- Access logs are turned off by default -->
<logger name="com.twitter.finatra.thrift.filters.AccessLoggingFilter" level="OFF" additivity="false">
<appender-ref ref="ASYNC-ACCESS"/>
<appender-ref ref="ASYNC-LOGLENS-ACCESS"/>
</logger>
</configuration>

View File

@ -1,31 +0,0 @@
scala_library(
compiler_option_sets = ["fatal_warnings"],
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/com/google/inject:guice",
"3rdparty/jvm/javax/inject:javax.inject",
"3rdparty/jvm/net/codingwell:scala-guice",
"finagle/finagle-core/src/main",
"finagle/finagle-http/src/main/scala",
"finagle/finagle-thriftmux/src/main/scala",
"finatra-internal/decider/src/main/scala",
"finatra-internal/mtls-thriftmux/src/main/scala",
"finatra/inject/inject-app/src/main/scala",
"finatra/inject/inject-core/src/main/scala",
"finatra/inject/inject-server/src/main/scala",
"finatra/inject/inject-thrift-client/src/main/scala",
"finatra/inject/inject-utils/src/main/scala",
"finatra/utils/src/main/java/com/twitter/finatra/annotations",
"relevance-platform/src/main/scala/com/twitter/relevance_platform/common/exceptions",
"relevance-platform/src/main/scala/com/twitter/relevance_platform/common/filters",
"simclusters-ann/server/src/main/resources",
"simclusters-ann/server/src/main/scala/com/twitter/simclustersann/controllers",
"simclusters-ann/server/src/main/scala/com/twitter/simclustersann/modules",
"simclusters-ann/thrift/src/main/thrift:thrift-scala",
"src/thrift/com/twitter/search:earlybird-scala",
"thrift-web-forms/src/main/scala/com/twitter/thriftwebforms/view",
"twitter-server/server/src/main/scala",
"util/util-app/src/main/scala",
"util/util-core:scala",
],
)

View File

@ -1,70 +0,0 @@
package com.twitter.simclustersann
import com.google.inject.Module
import com.twitter.finatra.decider.modules.DeciderModule
import com.twitter.finatra.mtls.thriftmux.Mtls
import com.twitter.finatra.thrift.ThriftServer
import com.twitter.finatra.thrift.filters._
import com.twitter.finatra.thrift.routing.ThriftRouter
import com.twitter.inject.thrift.modules.ThriftClientIdModule
import com.twitter.relevance_platform.common.exceptions._
import com.twitter.simclustersann.controllers.SimClustersANNController
import com.twitter.simclustersann.exceptions.InvalidRequestForSimClustersAnnVariantExceptionMapper
import com.twitter.simclustersann.modules._
import com.twitter.simclustersann.thriftscala.SimClustersANNService
import com.twitter.finagle.Filter
import com.twitter.finatra.annotations.DarkTrafficFilterType
import com.twitter.inject.annotations.Flags
import com.twitter.relevance_platform.common.filters.DarkTrafficFilterModule
import com.twitter.relevance_platform.common.filters.ClientStatsFilter
import com.twitter.simclustersann.common.FlagNames.DisableWarmup
object SimClustersAnnServerMain extends SimClustersAnnServer
class SimClustersAnnServer extends ThriftServer with Mtls {
flag(
name = DisableWarmup,
default = false,
help = "If true, no warmup will be run."
)
override val name = "simclusters-ann-server"
override val modules: Seq[Module] = Seq(
CacheModule,
ServiceNameMapperModule,
ClusterConfigMapperModule,
ClusterConfigModule,
ClusterTweetIndexProviderModule,
DeciderModule,
EmbeddingStoreModule,
FlagsModule,
FuturePoolProvider,
RateLimiterModule,
SimClustersANNCandidateSourceModule,
StratoClientProviderModule,
ThriftClientIdModule,
new CustomMtlsThriftWebFormsModule[SimClustersANNService.MethodPerEndpoint](this),
new DarkTrafficFilterModule[SimClustersANNService.ReqRepServicePerEndpoint]()
)
def configureThrift(router: ThriftRouter): Unit = {
router
.filter[LoggingMDCFilter]
.filter[TraceIdMDCFilter]
.filter[ThriftMDCFilter]
.filter[ClientStatsFilter]
.filter[ExceptionMappingFilter]
.filter[Filter.TypeAgnostic, DarkTrafficFilterType]
.exceptionMapper[InvalidRequestForSimClustersAnnVariantExceptionMapper]
.exceptionMapper[DeadlineExceededExceptionMapper]
.exceptionMapper[UnhandledExceptionMapper]
.add[SimClustersANNController]
}
override protected def warmup(): Unit = {
if (!injector.instance[Boolean](Flags.named(DisableWarmup))) {
handle[SimclustersAnnWarmupHandler]()
}
}
}

View File

@ -1,73 +0,0 @@
package com.twitter.simclustersann
import com.twitter.inject.Logging
import com.twitter.inject.utils.Handler
import javax.inject.Inject
import scala.util.control.NonFatal
import com.google.common.util.concurrent.RateLimiter
import com.twitter.conversions.DurationOps.richDurationFromInt
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.simclusters_v2.common.ClusterId
import com.twitter.simclusters_v2.common.TweetId
import com.twitter.storehaus.ReadableStore
import com.twitter.util.Await
import com.twitter.util.ExecutorServiceFuturePool
import com.twitter.util.Future
class SimclustersAnnWarmupHandler @Inject() (
clusterTweetCandidatesStore: ReadableStore[ClusterId, Seq[(TweetId, Double)]],
futurePool: ExecutorServiceFuturePool,
rateLimiter: RateLimiter,
statsReceiver: StatsReceiver)
extends Handler
with Logging {
private val stats = statsReceiver.scope(this.getClass.getName)
private val scopedStats = stats.scope("fetchFromCache")
private val clusters = scopedStats.counter("clusters")
private val fetchedKeys = scopedStats.counter("keys")
private val failures = scopedStats.counter("failures")
private val success = scopedStats.counter("success")
private val SimclustersNumber = 144428
override def handle(): Unit = {
try {
val clusterIds = List.range(1, SimclustersNumber)
val futures: Seq[Future[Unit]] = clusterIds
.map { clusterId =>
clusters.incr()
futurePool {
rateLimiter.acquire()
Await.result(
clusterTweetCandidatesStore
.get(clusterId)
.onSuccess { _ =>
success.incr()
}
.handle {
case NonFatal(e) =>
failures.incr()
},
timeout = 10.seconds
)
fetchedKeys.incr()
}
}
Await.result(Future.collect(futures), timeout = 10.minutes)
} catch {
case NonFatal(e) => error(e.getMessage, e)
} finally {
try {
futurePool.executor.shutdown()
} catch {
case NonFatal(_) =>
}
info("Warmup done.")
}
}
}

View File

@ -1,129 +0,0 @@
package com.twitter.simclustersann.candidate_source
import com.twitter.simclusters_v2.common.ClusterId
import com.twitter.simclusters_v2.common.SimClustersEmbedding
import com.twitter.simclusters_v2.common.TweetId
import com.twitter.simclusters_v2.thriftscala.InternalId
import com.twitter.simclusters_v2.thriftscala.SimClustersEmbeddingId
import com.twitter.simclustersann.thriftscala.ScoringAlgorithm
import com.twitter.simclustersann.thriftscala.SimClustersANNConfig
import com.twitter.snowflake.id.SnowflakeId
import com.twitter.util.Duration
import com.twitter.util.Time
import scala.collection.mutable
/**
* This store looks for tweets whose similarity is close to a Source SimClustersEmbeddingId.
*
* Approximate cosine similarity is the core algorithm to drive this store.
*
* Step 1 - 4 are in "fetchCandidates" method.
* 1. Retrieve the SimClusters Embedding by the SimClustersEmbeddingId
* 2. Fetch top N clusters' top tweets from the clusterTweetCandidatesStore (TopTweetsPerCluster index).
* 3. Calculate all the tweet candidates' dot-product or approximate cosine similarity to source tweets.
* 4. Take top M tweet candidates by the step 3's score
*/
trait ApproximateCosineSimilarity {
type ScoredTweet = (Long, Double)
def apply(
sourceEmbedding: SimClustersEmbedding,
sourceEmbeddingId: SimClustersEmbeddingId,
config: SimClustersANNConfig,
candidateScoresStat: Int => Unit,
clusterTweetsMap: Map[ClusterId, Option[Seq[(TweetId, Double)]]],
clusterTweetsMapArray: Map[ClusterId, Option[Array[(TweetId, Double)]]] = Map.empty
): Seq[ScoredTweet]
}
object ApproximateCosineSimilarity extends ApproximateCosineSimilarity {
final val InitialCandidateMapSize = 16384
val MaxNumResultsUpperBound = 1000
final val MaxTweetCandidateAgeUpperBound = 175200
private class HashMap[A, B](initSize: Int) extends mutable.HashMap[A, B] {
override def initialSize: Int = initSize // 16 - by default
}
private def parseTweetId(embeddingId: SimClustersEmbeddingId): Option[TweetId] = {
embeddingId.internalId match {
case InternalId.TweetId(tweetId) =>
Some(tweetId)
case _ =>
None
}
}
override def apply(
sourceEmbedding: SimClustersEmbedding,
sourceEmbeddingId: SimClustersEmbeddingId,
config: SimClustersANNConfig,
candidateScoresStat: Int => Unit,
clusterTweetsMap: Map[ClusterId, Option[Seq[(TweetId, Double)]]] = Map.empty,
clusterTweetsMapArray: Map[ClusterId, Option[Array[(TweetId, Double)]]] = Map.empty
): Seq[ScoredTweet] = {
val now = Time.now
val earliestTweetId =
if (config.maxTweetCandidateAgeHours >= MaxTweetCandidateAgeUpperBound)
0L // Disable max tweet age filter
else
SnowflakeId.firstIdFor(now - Duration.fromHours(config.maxTweetCandidateAgeHours))
val latestTweetId =
SnowflakeId.firstIdFor(now - Duration.fromHours(config.minTweetCandidateAgeHours))
// Use Mutable map to optimize performance. The method is thread-safe.
// Set initial map size to around p75 of map size distribution to avoid too many copying
// from extending the size of the mutable hashmap
val candidateScoresMap =
new HashMap[TweetId, Double](InitialCandidateMapSize)
val candidateNormalizationMap =
new HashMap[TweetId, Double](InitialCandidateMapSize)
clusterTweetsMap.foreach {
case (clusterId, Some(tweetScores)) if sourceEmbedding.contains(clusterId) =>
val sourceClusterScore = sourceEmbedding.getOrElse(clusterId)
for (i <- 0 until Math.min(tweetScores.size, config.maxTopTweetsPerCluster)) {
val (tweetId, score) = tweetScores(i)
if (!parseTweetId(sourceEmbeddingId).contains(tweetId) &&
tweetId >= earliestTweetId && tweetId <= latestTweetId) {
candidateScoresMap.put(
tweetId,
candidateScoresMap.getOrElse(tweetId, 0.0) + score * sourceClusterScore)
candidateNormalizationMap
.put(tweetId, candidateNormalizationMap.getOrElse(tweetId, 0.0) + score * score)
}
}
case _ => ()
}
candidateScoresStat(candidateScoresMap.size)
// Re-Rank the candidate by configuration
val processedCandidateScores: Seq[(TweetId, Double)] = candidateScoresMap.map {
case (candidateId, score) =>
// Enable Partial Normalization
val processedScore = {
// We applied the "log" version of partial normalization when we rank candidates
// by log cosine similarity
config.annAlgorithm match {
case ScoringAlgorithm.LogCosineSimilarity =>
score / sourceEmbedding.logNorm / math.log(1 + candidateNormalizationMap(candidateId))
case ScoringAlgorithm.CosineSimilarity =>
score / sourceEmbedding.l2norm / math.sqrt(candidateNormalizationMap(candidateId))
case ScoringAlgorithm.CosineSimilarityNoSourceEmbeddingNormalization =>
score / math.sqrt(candidateNormalizationMap(candidateId))
case ScoringAlgorithm.DotProduct => score
}
}
candidateId -> processedScore
}.toSeq
processedCandidateScores
.filter(_._2 >= config.minScore)
.sortBy(-_._2)
.take(Math.min(config.maxNumResults, MaxNumResultsUpperBound))
}
}

View File

@ -1,14 +0,0 @@
scala_library(
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/com/google/guava",
"3rdparty/jvm/com/twitter/storehaus:core",
"frigate/frigate-common:base",
"frigate/frigate-common/src/main/scala/com/twitter/frigate/common/base",
"simclusters-ann/thrift/src/main/thrift:thrift-scala",
"src/scala/com/twitter/simclusters_v2/common",
"src/scala/com/twitter/simclusters_v2/summingbird/stores",
"src/thrift/com/twitter/simclusters_v2:simclusters_v2-thrift-scala",
"util/util-stats/src/main/scala/com/twitter/finagle/stats",
],
)

View File

@ -1,131 +0,0 @@
package com.twitter.simclustersann.candidate_source
import com.twitter.simclusters_v2.common.SimClustersEmbedding
import com.twitter.simclusters_v2.common.TweetId
import com.twitter.simclusters_v2.thriftscala.InternalId
import com.twitter.simclusters_v2.thriftscala.SimClustersEmbeddingId
import com.twitter.simclustersann.thriftscala.ScoringAlgorithm
import com.twitter.simclustersann.thriftscala.SimClustersANNConfig
import com.twitter.snowflake.id.SnowflakeId
import com.twitter.util.Duration
import com.twitter.util.Time
import com.google.common.collect.Comparators
import com.twitter.simclusters_v2.common.ClusterId
/**
* A modified version of OptimizedApproximateCosineSimilarity which uses more java streams to avoid
* materializing intermediate collections. Its performance is still under investigation.
*/
object ExperimentalApproximateCosineSimilarity extends ApproximateCosineSimilarity {
final val InitialCandidateMapSize = 16384
val MaxNumResultsUpperBound = 1000
final val MaxTweetCandidateAgeUpperBound = 175200
private def parseTweetId(embeddingId: SimClustersEmbeddingId): Option[TweetId] = {
embeddingId.internalId match {
case InternalId.TweetId(tweetId) =>
Some(tweetId)
case _ =>
None
}
}
private val CompareByScore: java.util.Comparator[(Long, Double)] =
new java.util.Comparator[(Long, Double)] {
override def compare(o1: (Long, Double), o2: (Long, Double)): Int = {
java.lang.Double.compare(o1._2, o2._2)
}
}
class Scores(var score: Double, var norm: Double)
override def apply(
sourceEmbedding: SimClustersEmbedding,
sourceEmbeddingId: SimClustersEmbeddingId,
config: SimClustersANNConfig,
candidateScoresStat: Int => Unit,
clusterTweetsMap: Map[ClusterId, Option[Seq[(TweetId, Double)]]] = Map.empty,
clusterTweetsMapArray: Map[ClusterId, Option[Array[(TweetId, Double)]]] = Map.empty
): Seq[ScoredTweet] = {
val now = Time.now
val earliestTweetId =
if (config.maxTweetCandidateAgeHours >= MaxTweetCandidateAgeUpperBound)
0L // Disable max tweet age filter
else
SnowflakeId.firstIdFor(now - Duration.fromHours(config.maxTweetCandidateAgeHours))
val latestTweetId =
SnowflakeId.firstIdFor(now - Duration.fromHours(config.minTweetCandidateAgeHours))
val candidateScoresMap = new java.util.HashMap[Long, Scores](InitialCandidateMapSize)
val sourceTweetId = parseTweetId(sourceEmbeddingId).getOrElse(0L)
clusterTweetsMap.foreach {
case (clusterId, Some(tweetScores)) =>
val sourceClusterScore = sourceEmbedding.getOrElse(clusterId)
for (i <- 0 until Math.min(tweetScores.size, config.maxTopTweetsPerCluster)) {
val (tweetId, score) = tweetScores(i)
if (tweetId >= earliestTweetId &&
tweetId <= latestTweetId &&
tweetId != sourceTweetId) {
val scores = candidateScoresMap.get(tweetId)
if (scores == null) {
val scorePair = new Scores(
score = score * sourceClusterScore,
norm = score * score
)
candidateScoresMap.put(tweetId, scorePair)
} else {
scores.score = scores.score + (score * sourceClusterScore)
scores.norm = scores.norm + (score * score)
}
}
}
case _ => ()
}
candidateScoresStat(candidateScoresMap.size)
val normFn: (Long, Scores) => (Long, Double) = config.annAlgorithm match {
case ScoringAlgorithm.LogCosineSimilarity =>
(candidateId: Long, score: Scores) =>
(
candidateId,
score.score / sourceEmbedding.logNorm / math.log(1 + score.norm)
)
case ScoringAlgorithm.CosineSimilarity =>
(candidateId: Long, score: Scores) =>
(
candidateId,
score.score / sourceEmbedding.l2norm / math.sqrt(score.norm)
)
case ScoringAlgorithm.CosineSimilarityNoSourceEmbeddingNormalization =>
(candidateId: Long, score: Scores) =>
(
candidateId,
score.score / math.sqrt(score.norm)
)
case ScoringAlgorithm.DotProduct =>
(candidateId: Long, score: Scores) =>
(
candidateId,
score.score
)
}
import scala.collection.JavaConverters._
val topKCollector = Comparators.greatest(
Math.min(config.maxNumResults, MaxNumResultsUpperBound),
CompareByScore
)
candidateScoresMap
.entrySet().stream()
.map[(Long, Double)]((e: java.util.Map.Entry[Long, Scores]) => normFn(e.getKey, e.getValue))
.filter((s: (Long, Double)) => s._2 >= config.minScore)
.collect(topKCollector)
.asScala
}
}

View File

@ -1,112 +0,0 @@
package com.twitter.simclustersann.candidate_source
import com.twitter.simclusters_v2.common.ClusterId
import com.twitter.simclusters_v2.common.SimClustersEmbedding
import com.twitter.simclusters_v2.common.TweetId
import com.twitter.simclusters_v2.thriftscala.InternalId
import com.twitter.simclusters_v2.thriftscala.SimClustersEmbeddingId
import com.twitter.simclustersann.thriftscala.ScoringAlgorithm
import com.twitter.simclustersann.thriftscala.SimClustersANNConfig
import com.twitter.snowflake.id.SnowflakeId
import com.twitter.util.Duration
import com.twitter.util.Time
/**
* Compared with ApproximateCosineSimilarity, this implementation:
* - moves some computation aroudn to reduce allocations
* - uses a single hashmap to store both scores and normalization coefficients
* - uses some java collections in place of scala ones
* Testing is still in progress, but this implementation shows significant (> 2x) improvements in
* CPU utilization and allocations with 800 tweets per cluster.
*/
object OptimizedApproximateCosineSimilarity extends ApproximateCosineSimilarity {
final val InitialCandidateMapSize = 16384
val MaxNumResultsUpperBound = 1000
final val MaxTweetCandidateAgeUpperBound = 175200
private def parseTweetId(embeddingId: SimClustersEmbeddingId): Option[TweetId] = {
embeddingId.internalId match {
case InternalId.TweetId(tweetId) =>
Some(tweetId)
case _ =>
None
}
}
override def apply(
sourceEmbedding: SimClustersEmbedding,
sourceEmbeddingId: SimClustersEmbeddingId,
config: SimClustersANNConfig,
candidateScoresStat: Int => Unit,
clusterTweetsMap: Map[ClusterId, Option[Seq[(TweetId, Double)]]] = Map.empty,
clusterTweetsMapArray: Map[ClusterId, Option[Array[(TweetId, Double)]]] = Map.empty
): Seq[ScoredTweet] = {
val now = Time.now
val earliestTweetId =
if (config.maxTweetCandidateAgeHours >= MaxTweetCandidateAgeUpperBound)
0L // Disable max tweet age filter
else
SnowflakeId.firstIdFor(now - Duration.fromHours(config.maxTweetCandidateAgeHours))
val latestTweetId =
SnowflakeId.firstIdFor(now - Duration.fromHours(config.minTweetCandidateAgeHours))
val candidateScoresMap = new java.util.HashMap[Long, (Double, Double)](InitialCandidateMapSize)
val sourceTweetId = parseTweetId(sourceEmbeddingId).getOrElse(0L)
clusterTweetsMap.foreach {
case (clusterId, Some(tweetScores)) if sourceEmbedding.contains(clusterId) =>
val sourceClusterScore = sourceEmbedding.getOrElse(clusterId)
for (i <- 0 until Math.min(tweetScores.size, config.maxTopTweetsPerCluster)) {
val (tweetId, score) = tweetScores(i)
if (tweetId >= earliestTweetId &&
tweetId <= latestTweetId &&
tweetId != sourceTweetId) {
val scores = candidateScoresMap.getOrDefault(tweetId, (0.0, 0.0))
val newScores = (
scores._1 + score * sourceClusterScore,
scores._2 + score * score,
)
candidateScoresMap.put(tweetId, newScores)
}
}
case _ => ()
}
candidateScoresStat(candidateScoresMap.size)
val normFn: (Long, (Double, Double)) => (Long, Double) = config.annAlgorithm match {
case ScoringAlgorithm.LogCosineSimilarity =>
(candidateId: Long, score: (Double, Double)) =>
candidateId -> score._1 / sourceEmbedding.logNorm / math.log(1 + score._2)
case ScoringAlgorithm.CosineSimilarity =>
(candidateId: Long, score: (Double, Double)) =>
candidateId -> score._1 / sourceEmbedding.l2norm / math.sqrt(score._2)
case ScoringAlgorithm.CosineSimilarityNoSourceEmbeddingNormalization =>
(candidateId: Long, score: (Double, Double)) =>
candidateId -> score._1 / math.sqrt(score._2)
case ScoringAlgorithm.DotProduct =>
(candidateId: Long, score: (Double, Double)) => (candidateId, score._1)
}
val scoredTweets: java.util.ArrayList[(Long, Double)] =
new java.util.ArrayList(candidateScoresMap.size)
val it = candidateScoresMap.entrySet().iterator()
while (it.hasNext) {
val mapEntry = it.next()
val normedScore = normFn(mapEntry.getKey, mapEntry.getValue)
if (normedScore._2 >= config.minScore)
scoredTweets.add(normedScore)
}
import scala.collection.JavaConverters._
scoredTweets.asScala
.sortBy(-_._2)
.take(Math.min(config.maxNumResults, MaxNumResultsUpperBound))
}
}

View File

@ -1,102 +0,0 @@
package com.twitter.simclustersann.candidate_source
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.frigate.common.base.Stats
import com.twitter.simclusters_v2.common.ClusterId
import com.twitter.simclusters_v2.common.SimClustersEmbedding
import com.twitter.simclusters_v2.common.TweetId
import com.twitter.simclusters_v2.thriftscala.SimClustersEmbeddingId
import com.twitter.simclustersann.thriftscala.SimClustersANNConfig
import com.twitter.simclustersann.thriftscala.SimClustersANNTweetCandidate
import com.twitter.storehaus.ReadableStore
import com.twitter.util.Future
/**
* This store looks for tweets whose similarity is close to a Source SimClustersEmbeddingId.
*
* Approximate cosine similarity is the core algorithm to drive this store.
*
* Step 1 - 4 are in "fetchCandidates" method.
* 1. Retrieve the SimClusters Embedding by the SimClustersEmbeddingId
* 2. Fetch top N clusters' top tweets from the clusterTweetCandidatesStore (TopTweetsPerCluster index).
* 3. Calculate all the tweet candidates' dot-product or approximate cosine similarity to source tweets.
* 4. Take top M tweet candidates by the step 3's score
*/
case class SimClustersANNCandidateSource(
approximateCosineSimilarity: ApproximateCosineSimilarity,
clusterTweetCandidatesStore: ReadableStore[ClusterId, Seq[(TweetId, Double)]],
simClustersEmbeddingStore: ReadableStore[SimClustersEmbeddingId, SimClustersEmbedding],
statsReceiver: StatsReceiver) {
private val stats = statsReceiver.scope(this.getClass.getName)
private val fetchSourceEmbeddingStat = stats.scope("fetchSourceEmbedding")
private val fetchCandidatesStat = stats.scope("fetchCandidates")
private val candidateScoresStat = stats.stat("candidateScoresMap")
def get(
query: SimClustersANNCandidateSource.Query
): Future[Option[Seq[SimClustersANNTweetCandidate]]] = {
val sourceEmbeddingId = query.sourceEmbeddingId
val config = query.config
for {
maybeSimClustersEmbedding <- Stats.track(fetchSourceEmbeddingStat) {
simClustersEmbeddingStore.get(query.sourceEmbeddingId)
}
maybeFilteredCandidates <- maybeSimClustersEmbedding match {
case Some(sourceEmbedding) =>
for {
candidates <- Stats.trackSeq(fetchCandidatesStat) {
fetchCandidates(sourceEmbeddingId, sourceEmbedding, config)
}
} yield {
fetchCandidatesStat
.stat(sourceEmbeddingId.embeddingType.name, sourceEmbeddingId.modelVersion.name).add(
candidates.size)
Some(candidates)
}
case None =>
fetchCandidatesStat
.stat(sourceEmbeddingId.embeddingType.name, sourceEmbeddingId.modelVersion.name).add(0)
Future.None
}
} yield {
maybeFilteredCandidates
}
}
private def fetchCandidates(
sourceEmbeddingId: SimClustersEmbeddingId,
sourceEmbedding: SimClustersEmbedding,
config: SimClustersANNConfig
): Future[Seq[SimClustersANNTweetCandidate]] = {
val clusterIds =
sourceEmbedding
.truncate(config.maxScanClusters).getClusterIds()
.toSet
Future
.collect {
clusterTweetCandidatesStore.multiGet(clusterIds)
}.map { clusterTweetsMap =>
approximateCosineSimilarity(
sourceEmbedding = sourceEmbedding,
sourceEmbeddingId = sourceEmbeddingId,
config = config,
candidateScoresStat = (i: Int) => candidateScoresStat.add(i),
clusterTweetsMap = clusterTweetsMap
).map {
case (tweetId, score) =>
SimClustersANNTweetCandidate(
tweetId = tweetId,
score = score
)
}
}
}
}
object SimClustersANNCandidateSource {
case class Query(
sourceEmbeddingId: SimClustersEmbeddingId,
config: SimClustersANNConfig)
}

View File

@ -1,5 +0,0 @@
scala_library(
compiler_option_sets = ["fatal_warnings"],
tags = ["bazel-compatible"],
dependencies = [],
)

View File

@ -1,31 +0,0 @@
package com.twitter.simclustersann.common
object FlagNames {
/**
* Global Settings
*/
final val ServiceTimeout = "service.timeout"
final val DarkTrafficFilterDeciderKey = "thrift.dark.traffic.filter.decider_key"
/**
* Cache Setting
*/
final val CacheDest = "cache_module.dest"
final val CacheTimeout = "cache_module.timeout"
// Only turn on the async update when the SANN Cluster has the production taffic.
final val CacheAsyncUpdate = "cache_module.async_update"
/**
* Warmup Settings
*/
final val DisableWarmup = "warmup.disable"
final val NumberOfThreads = "warmup.thread_number"
final val RateLimiterQPS = "warmup.rate_limiter_qps"
/**
* Algorithm Parameters
*/
final val MaxTopTweetPerCluster = "sim_clusters.ann.max_top_tweets_per_cluster"
}

View File

@ -1,29 +0,0 @@
scala_library(
compiler_option_sets = ["fatal_warnings"],
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/com/google/inject:guice",
"3rdparty/jvm/javax/inject:javax.inject",
"3rdparty/jvm/net/codingwell:scala-guice",
"decider/src/main/scala",
"finagle/finagle-core/src/main",
"finatra/inject/inject-core/src/main/scala",
"finatra/thrift/src/main/scala/com/twitter/finatra/thrift",
"finatra/thrift/src/main/scala/com/twitter/finatra/thrift:controller",
"finatra/thrift/src/main/scala/com/twitter/finatra/thrift/exceptions",
"finatra/thrift/src/main/scala/com/twitter/finatra/thrift/filters",
"finatra/thrift/src/main/scala/com/twitter/finatra/thrift/modules",
"finatra/thrift/src/main/scala/com/twitter/finatra/thrift/response",
"finatra/thrift/src/main/scala/com/twitter/finatra/thrift/routing",
"representation-manager/server/src/main/scala/com/twitter/representation_manager/migration",
"scrooge/scrooge-core/src/main/scala",
"simclusters-ann/server/src/main/scala/com/twitter/simclustersann/candidate_source",
"simclusters-ann/server/src/main/scala/com/twitter/simclustersann/common",
"simclusters-ann/server/src/main/scala/com/twitter/simclustersann/filters",
"simclusters-ann/thrift/src/main/thrift:thrift-scala",
"src/scala/com/twitter/simclusters_v2/candidate_source",
"twitter-server/server/src/main/scala",
"util/util-core:scala",
"util/util-slf4j-api/src/main/scala",
],
)

View File

@ -1,80 +0,0 @@
package com.twitter.simclustersann.controllers
import com.twitter.conversions.DurationOps._
import com.twitter.finatra.thrift.Controller
import com.twitter.simclustersann.thriftscala.SimClustersANNService.GetTweetCandidates
import com.twitter.simclustersann.thriftscala.SimClustersANNService
import com.twitter.simclustersann.thriftscala.Query
import com.twitter.simclustersann.thriftscala.SimClustersANNTweetCandidate
import com.twitter.scrooge.Request
import com.twitter.scrooge.Response
import javax.inject.Inject
import com.twitter.finagle.Service
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.inject.annotations.Flag
import com.twitter.simclustersann.candidate_source.{
SimClustersANNCandidateSource => SANNSimClustersANNCandidateSource
}
import com.twitter.simclustersann.common.FlagNames
import com.twitter.simclustersann.filters.GetTweetCandidatesResponseStatsFilter
import com.twitter.simclustersann.filters.SimClustersAnnVariantFilter
import com.twitter.util.Future
import com.twitter.util.JavaTimer
import com.twitter.util.Timer
class SimClustersANNController @Inject() (
@Flag(FlagNames.ServiceTimeout) serviceTimeout: Int,
variantFilter: SimClustersAnnVariantFilter,
getTweetCandidatesResponseStatsFilter: GetTweetCandidatesResponseStatsFilter,
sannCandidateSource: SANNSimClustersANNCandidateSource,
globalStats: StatsReceiver)
extends Controller(SimClustersANNService) {
import SimClustersANNController._
private val stats: StatsReceiver = globalStats.scope(this.getClass.getCanonicalName)
private val timer: Timer = new JavaTimer(true)
val filteredService: Service[Request[GetTweetCandidates.Args], Response[
Seq[SimClustersANNTweetCandidate]
]] = {
variantFilter
.andThen(getTweetCandidatesResponseStatsFilter)
.andThen(Service.mk(handler))
}
handle(GetTweetCandidates).withService(filteredService)
private def handler(
request: Request[GetTweetCandidates.Args]
): Future[Response[Seq[SimClustersANNTweetCandidate]]] = {
val query: Query = request.args.query
val simClustersANNCandidateSourceQuery = SANNSimClustersANNCandidateSource.Query(
sourceEmbeddingId = query.sourceEmbeddingId,
config = query.config
)
val result = sannCandidateSource
.get(simClustersANNCandidateSourceQuery).map {
case Some(tweetCandidatesSeq) =>
Response(tweetCandidatesSeq.map { tweetCandidate =>
SimClustersANNTweetCandidate(
tweetId = tweetCandidate.tweetId,
score = tweetCandidate.score
)
})
case None =>
DefaultResponse
}
result.raiseWithin(serviceTimeout.milliseconds)(timer).rescue {
case e: Throwable =>
stats.scope("failures").counter(e.getClass.getCanonicalName).incr()
Future.value(DefaultResponse)
}
}
}
object SimClustersANNController {
val DefaultResponse: Response[Seq[SimClustersANNTweetCandidate]] = Response(Seq.empty)
}

View File

@ -1,12 +0,0 @@
scala_library(
sources = ["*.scala"],
compiler_option_sets = ["fatal_warnings"],
strict_deps = True,
tags = ["bazel-compatible"],
dependencies = [
"finagle/finagle-core/src/main",
"finatra-internal/mtls-thriftmux/src/main/scala",
"finatra-internal/thrift/src/main/thrift:thrift-scala",
"src/thrift/com/twitter/simclusters_v2:simclusters_v2-thrift-scala",
],
)

View File

@ -1,16 +0,0 @@
package com.twitter.simclustersann.exceptions
import com.twitter.finagle.RequestException
import com.twitter.simclusters_v2.thriftscala.EmbeddingType
import com.twitter.simclusters_v2.thriftscala.ModelVersion
case class InvalidRequestForSimClustersAnnVariantException(
modelVersion: ModelVersion,
embeddingType: EmbeddingType,
actualServiceName: String,
expectedServiceName: Option[String])
extends RequestException(
s"Request with model version ($modelVersion) and embedding type ($embeddingType) cannot be " +
s"processed by service variant ($actualServiceName)." +
s" Expected service variant: $expectedServiceName.",
null)

View File

@ -1,27 +0,0 @@
package com.twitter.simclustersann.exceptions
import com.twitter.finatra.thrift.exceptions.ExceptionMapper
import com.twitter.finatra.thrift.thriftscala.ClientError
import com.twitter.finatra.thrift.thriftscala.ClientErrorCause
import com.twitter.util.Future
import com.twitter.util.logging.Logging
import javax.inject.Singleton
/**
* An exception mapper designed to handle
* [[com.twitter.simclustersann.exceptions.InvalidRequestForSimClustersAnnVariantException]]
* by returning a Thrift IDL defined Client Error.
*/
@Singleton
class InvalidRequestForSimClustersAnnVariantExceptionMapper
extends ExceptionMapper[InvalidRequestForSimClustersAnnVariantException, Nothing]
with Logging {
override def handleException(
throwable: InvalidRequestForSimClustersAnnVariantException
): Future[Nothing] = {
error("Invalid Request For SimClusters Ann Variant Exception", throwable)
Future.exception(ClientError(ClientErrorCause.BadRequest, throwable.getMessage()))
}
}

View File

@ -1,6 +0,0 @@
package com.twitter.simclustersann.exceptions
case class MissingClusterConfigForSimClustersAnnVariantException(sannServiceName: String)
extends IllegalStateException(
s"No cluster configuration found for service ($sannServiceName)",
null)

View File

@ -1,13 +0,0 @@
scala_library(
compiler_option_sets = ["fatal_warnings"],
tags = ["bazel-compatible"],
dependencies = [
"finagle/finagle-core/src/main",
"finatra/inject/inject-app/src/main/java/com/twitter/inject/annotations",
"finatra/inject/inject-core/src/main/scala",
"relevance-platform/src/main/scala/com/twitter/relevance_platform/simclustersann/multicluster",
"scrooge/scrooge-core/src/main/scala",
"simclusters-ann/server/src/main/scala/com/twitter/simclustersann/exceptions",
"simclusters-ann/thrift/src/main/thrift:thrift-scala",
],
)

View File

@ -1,43 +0,0 @@
package com.twitter.simclustersann.filters
import com.twitter.finagle.Service
import com.twitter.finagle.SimpleFilter
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.scrooge.Request
import com.twitter.scrooge.Response
import com.twitter.simclustersann.thriftscala.SimClustersANNService
import com.twitter.util.Future
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
class GetTweetCandidatesResponseStatsFilter @Inject() (
statsReceiver: StatsReceiver)
extends SimpleFilter[Request[SimClustersANNService.GetTweetCandidates.Args], Response[
SimClustersANNService.GetTweetCandidates.SuccessType
]] {
private[this] val stats = statsReceiver.scope("method_response_stats").scope("getTweetCandidates")
private[this] val candidateScoreStats = stats.stat("candidate_score_x1000")
private[this] val emptyResponseCounter = stats.counter("empty")
private[this] val nonEmptyResponseCounter = stats.counter("non_empty")
override def apply(
request: Request[SimClustersANNService.GetTweetCandidates.Args],
service: Service[Request[SimClustersANNService.GetTweetCandidates.Args], Response[
SimClustersANNService.GetTweetCandidates.SuccessType
]]
): Future[Response[SimClustersANNService.GetTweetCandidates.SuccessType]] = {
val response = service(request)
response.onSuccess { successResponse =>
if (successResponse.value.size == 0)
emptyResponseCounter.incr()
else
nonEmptyResponseCounter.incr()
successResponse.value.foreach { candidate =>
candidateScoreStats.add(candidate.score.toFloat * 1000)
}
}
response
}
}

View File

@ -1,53 +0,0 @@
package com.twitter.simclustersann.filters
import com.twitter.finagle.mtls.authentication.ServiceIdentifier
import com.twitter.finagle.Service
import com.twitter.finagle.SimpleFilter
import com.twitter.relevance_platform.simclustersann.multicluster.ServiceNameMapper
import com.twitter.scrooge.Request
import com.twitter.scrooge.Response
import com.twitter.simclustersann.exceptions.InvalidRequestForSimClustersAnnVariantException
import com.twitter.simclustersann.thriftscala.SimClustersANNService
import com.twitter.util.Future
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
class SimClustersAnnVariantFilter @Inject() (
serviceNameMapper: ServiceNameMapper,
serviceIdentifier: ServiceIdentifier,
) extends SimpleFilter[Request[SimClustersANNService.GetTweetCandidates.Args], Response[
SimClustersANNService.GetTweetCandidates.SuccessType
]] {
override def apply(
request: Request[SimClustersANNService.GetTweetCandidates.Args],
service: Service[Request[SimClustersANNService.GetTweetCandidates.Args], Response[
SimClustersANNService.GetTweetCandidates.SuccessType
]]
): Future[Response[SimClustersANNService.GetTweetCandidates.SuccessType]] = {
validateRequest(request)
service(request)
}
private def validateRequest(
request: Request[SimClustersANNService.GetTweetCandidates.Args]
): Unit = {
val modelVersion = request.args.query.sourceEmbeddingId.modelVersion
val embeddingType = request.args.query.config.candidateEmbeddingType
val actualServiceName = serviceIdentifier.service
val expectedServiceName = serviceNameMapper.getServiceName(modelVersion, embeddingType)
expectedServiceName match {
case Some(name) if name == actualServiceName => ()
case _ =>
throw InvalidRequestForSimClustersAnnVariantException(
modelVersion,
embeddingType,
actualServiceName,
expectedServiceName)
}
}
}

View File

@ -1,24 +0,0 @@
scala_library(
compiler_option_sets = ["fatal_warnings"],
tags = ["bazel-compatible"],
dependencies = [
"finagle-internal/mtls/src/main/scala/com/twitter/finagle/mtls/authentication",
"finagle/finagle-stats",
"finatra/inject/inject-core/src/main/scala",
"frigate/frigate-common/src/main/scala/com/twitter/frigate/common/store/strato",
"hermit/hermit-core/src/main/scala/com/twitter/hermit/store/common",
"relevance-platform/src/main/scala/com/twitter/relevance_platform/common/injection",
"relevance-platform/src/main/scala/com/twitter/relevance_platform/common/readablestore",
"relevance-platform/src/main/scala/com/twitter/relevance_platform/simclustersann/multicluster",
"representation-manager/client/src/main/scala/com/twitter/representation_manager",
"simclusters-ann/server/src/main/scala/com/twitter/simclustersann/candidate_source",
"simclusters-ann/server/src/main/scala/com/twitter/simclustersann/common",
"simclusters-ann/server/src/main/scala/com/twitter/simclustersann/exceptions",
"simclusters-ann/thrift/src/main/thrift:thrift-scala",
"src/scala/com/twitter/simclusters_v2/common",
"src/scala/com/twitter/simclusters_v2/summingbird",
"src/scala/com/twitter/storehaus_internal/memcache",
"src/scala/com/twitter/storehaus_internal/util",
"src/thrift/com/twitter/simclusters_v2:simclusters_v2-thrift-scala",
],
)

View File

@ -1,34 +0,0 @@
package com.twitter.simclustersann.modules
import com.google.inject.Provides
import com.twitter.finagle.memcached.Client
import javax.inject.Singleton
import com.twitter.conversions.DurationOps._
import com.twitter.inject.TwitterModule
import com.twitter.finagle.mtls.authentication.ServiceIdentifier
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.inject.annotations.Flag
import com.twitter.simclustersann.common.FlagNames
import com.twitter.storehaus_internal.memcache.MemcacheStore
import com.twitter.storehaus_internal.util.ClientName
import com.twitter.storehaus_internal.util.ZkEndPoint
object CacheModule extends TwitterModule {
@Singleton
@Provides
def providesCache(
@Flag(FlagNames.CacheDest) cacheDest: String,
@Flag(FlagNames.CacheTimeout) cacheTimeout: Int,
serviceIdentifier: ServiceIdentifier,
stats: StatsReceiver
): Client =
MemcacheStore.memcachedClient(
name = ClientName("memcache_simclusters_ann"),
dest = ZkEndPoint(cacheDest),
timeout = cacheTimeout.milliseconds,
retries = 0,
statsReceiver = stats.scope("cache_client"),
serviceIdentifier = serviceIdentifier
)
}

View File

@ -1,15 +0,0 @@
package com.twitter.simclustersann.modules
import com.google.inject.Provides
import com.twitter.inject.TwitterModule
import com.twitter.relevance_platform.simclustersann.multicluster.ClusterConfigMapper
import javax.inject.Singleton
object ClusterConfigMapperModule extends TwitterModule {
@Singleton
@Provides
def providesClusterConfigMapper(
): ClusterConfigMapper = {
ClusterConfigMapper
}
}

View File

@ -1,25 +0,0 @@
package com.twitter.simclustersann.modules
import com.google.inject.Provides
import com.twitter.finagle.mtls.authentication.ServiceIdentifier
import com.twitter.inject.TwitterModule
import com.twitter.relevance_platform.simclustersann.multicluster.ClusterConfig
import com.twitter.relevance_platform.simclustersann.multicluster.ClusterConfigMapper
import com.twitter.simclustersann.exceptions.MissingClusterConfigForSimClustersAnnVariantException
import javax.inject.Singleton
object ClusterConfigModule extends TwitterModule {
@Singleton
@Provides
def providesClusterConfig(
serviceIdentifier: ServiceIdentifier,
clusterConfigMapper: ClusterConfigMapper
): ClusterConfig = {
val serviceName = serviceIdentifier.service
clusterConfigMapper.getClusterConfig(serviceName) match {
case Some(config) => config
case None => throw MissingClusterConfigForSimClustersAnnVariantException(serviceName)
}
}
}

View File

@ -1,95 +0,0 @@
package com.twitter.simclustersann.modules
import com.google.inject.Provides
import com.twitter.conversions.DurationOps._
import com.twitter.decider.Decider
import com.twitter.finagle.memcached.Client
import com.twitter.finagle.mtls.authentication.ServiceIdentifier
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.hermit.store.common.ObservedCachedReadableStore
import com.twitter.hermit.store.common.ObservedMemcachedReadableStore
import com.twitter.inject.TwitterModule
import com.twitter.inject.annotations.Flag
import com.twitter.relevance_platform.common.injection.LZ4Injection
import com.twitter.relevance_platform.common.injection.SeqObjectInjection
import com.twitter.relevance_platform.simclustersann.multicluster.ClusterConfig
import com.twitter.relevance_platform.simclustersann.multicluster.ClusterTweetIndexStoreConfig
import com.twitter.simclusters_v2.common.ClusterId
import com.twitter.simclusters_v2.common.ModelVersions
import com.twitter.simclusters_v2.common.TweetId
import com.twitter.simclusters_v2.summingbird.stores.ClusterKey
import com.twitter.simclusters_v2.summingbird.stores.TopKTweetsForClusterKeyReadableStore
import com.twitter.simclusters_v2.thriftscala.EmbeddingType
import com.twitter.simclustersann.common.FlagNames
import com.twitter.storehaus.ReadableStore
import javax.inject.Singleton
object ClusterTweetIndexProviderModule extends TwitterModule {
@Singleton
@Provides
// Provides ClusterTweetIndex Store based on different maxResults settings on the same store
// Create a different provider if index is in a different store
def providesClusterTweetIndex(
@Flag(FlagNames.MaxTopTweetPerCluster) maxTopTweetPerCluster: Int,
@Flag(FlagNames.CacheAsyncUpdate) asyncUpdate: Boolean,
clusterConfig: ClusterConfig,
serviceIdentifier: ServiceIdentifier,
stats: StatsReceiver,
decider: Decider,
simClustersANNCacheClient: Client
): ReadableStore[ClusterId, Seq[(TweetId, Double)]] = {
// Build the underling cluster-to-tweet store
val topTweetsForClusterStore = clusterConfig.clusterTweetIndexStoreConfig match {
// If the config returns Manhattan tweet index config, we read from a RO MH store
case manhattanConfig: ClusterTweetIndexStoreConfig.Manhattan =>
TopKTweetsForClusterKeyReadableStore.getClusterToTopKTweetsStoreFromManhattanRO(
maxTopTweetPerCluster,
manhattanConfig,
serviceIdentifier)
case memCacheConfig: ClusterTweetIndexStoreConfig.Memcached =>
TopKTweetsForClusterKeyReadableStore.getClusterToTopKTweetsStoreFromMemCache(
maxTopTweetPerCluster,
memCacheConfig,
serviceIdentifier)
case _ =>
// Bad instance
ReadableStore.empty
}
val embeddingType: EmbeddingType = clusterConfig.candidateTweetEmbeddingType
val modelVersion: String = ModelVersions.toKnownForModelVersion(clusterConfig.modelVersion)
val store: ReadableStore[ClusterId, Seq[(TweetId, Double)]] =
topTweetsForClusterStore.composeKeyMapping { id: ClusterId =>
ClusterKey(id, modelVersion, embeddingType)
}
val memcachedTopTweetsForClusterStore =
ObservedMemcachedReadableStore.fromCacheClient(
backingStore = store,
cacheClient = simClustersANNCacheClient,
ttl = 15.minutes,
asyncUpdate = asyncUpdate
)(
valueInjection = LZ4Injection.compose(SeqObjectInjection[(Long, Double)]()),
statsReceiver = stats.scope("cluster_tweet_index_mem_cache"),
keyToString = { k =>
// prod cache key : SimClusters_LZ4/cluster_to_tweet/clusterId_embeddingType_modelVersion
s"scz:c2t:${k}_${embeddingType}_${modelVersion}_$maxTopTweetPerCluster"
}
)
val cachedStore: ReadableStore[ClusterId, Seq[(TweetId, Double)]] = {
ObservedCachedReadableStore.from[ClusterId, Seq[(TweetId, Double)]](
memcachedTopTweetsForClusterStore,
ttl = 10.minute,
maxKeys = 150000,
cacheName = "cluster_tweet_index_cache",
windowSize = 10000L
)(stats.scope("cluster_tweet_index_store"))
}
cachedStore
}
}

View File

@ -1,99 +0,0 @@
package com.twitter.simclustersann.modules
import com.twitter.finatra.mtls.thriftmux.modules.MtlsThriftWebFormsModule
import com.twitter.finatra.thrift.ThriftServer
import com.twitter.simclusters_v2.thriftscala.EmbeddingType
import com.twitter.simclusters_v2.thriftscala.InternalId
import com.twitter.simclusters_v2.thriftscala.ModelVersion
import com.twitter.simclusters_v2.thriftscala.SimClustersEmbeddingId
import com.twitter.thriftwebforms.MethodOptions
import com.twitter.thriftwebforms.view.ServiceResponseView
import com.twitter.util.Future
import com.twitter.simclustersann.thriftscala.SimClustersANNTweetCandidate
import com.twitter.simclustersann.thriftscala.Query
import com.twitter.simclustersann.thriftscala.SimClustersANNConfig
import com.twitter.simclustersann.thriftscala.ScoringAlgorithm
import com.twitter.thriftwebforms.MethodOptions.Access
import scala.reflect.ClassTag
import com.twitter.simclustersann.thriftscala.SimClustersANNService
import scala.collection.mutable
class CustomMtlsThriftWebFormsModule[T: ClassTag](server: ThriftServer)
extends MtlsThriftWebFormsModule[T](server: ThriftServer) {
private val Nbsp = "&nbsp;"
private val LdapGroups = Seq("recosplat-sensitive-data-medium", "simclusters-ann-admins")
override protected def methodOptions: Map[String, MethodOptions] = {
val tweetId = 1568796529690902529L
val sannDefaultQuery = SimClustersANNService.GetTweetCandidates.Args(
query = Query(
sourceEmbeddingId = SimClustersEmbeddingId(
embeddingType = EmbeddingType.LogFavLongestL2EmbeddingTweet,
modelVersion = ModelVersion.Model20m145k2020,
internalId = InternalId.TweetId(tweetId)
),
config = SimClustersANNConfig(
maxNumResults = 10,
minScore = 0.0,
candidateEmbeddingType = EmbeddingType.LogFavBasedTweet,
maxTopTweetsPerCluster = 400,
maxScanClusters = 50,
maxTweetCandidateAgeHours = 24,
minTweetCandidateAgeHours = 0,
annAlgorithm = ScoringAlgorithm.CosineSimilarity
)
))
Seq("getTweetCandidates")
.map(
_ -> MethodOptions(
defaultRequestValue = Some(sannDefaultQuery),
responseRenderers = Seq(renderTimeline),
allowedAccessOverride = Some(Access.ByLdapGroup(LdapGroups))
)).toMap
}
val FullAccessLdapGroups: Seq[String] =
Seq(
"recosplat-sensitive-data-medium",
"simclusters-ann-admins",
"recos-platform-admins"
)
override protected def defaultMethodAccess: MethodOptions.Access = {
MethodOptions.Access.ByLdapGroup(FullAccessLdapGroups)
}
def renderTimeline(r: AnyRef): Future[ServiceResponseView] = {
val simClustersANNTweetCandidates = r match {
case response: Iterable[_] =>
response.map(x => x.asInstanceOf[SimClustersANNTweetCandidate]).toSeq
case _ => Seq()
}
renderTweets(simClustersANNTweetCandidates)
}
private def renderTweets(
simClustersANNTweetCandidates: Seq[SimClustersANNTweetCandidate]
): Future[ServiceResponseView] = {
val htmlSb = new mutable.StringBuilder()
val headerHtml = s"""<h3>Tweet Candidates</h3>"""
val tweetsHtml = simClustersANNTweetCandidates.map { simClustersANNTweetCandidate =>
val tweetId = simClustersANNTweetCandidate.tweetId
val score = simClustersANNTweetCandidate.score
s"""<blockquote class="twitter-tweet"><a href="https://twitter.com/tweet/statuses/$tweetId"></a></blockquote> <b>score:</b> $score <br><br>"""
}.mkString
htmlSb ++= headerHtml
htmlSb ++= Nbsp
htmlSb ++= tweetsHtml
Future.value(
ServiceResponseView(
"SimClusters ANN Tweet Candidates",
htmlSb.toString(),
Seq("//platform.twitter.com/widgets.js")
)
)
}
}

View File

@ -1,110 +0,0 @@
package com.twitter.simclustersann.modules
import com.google.inject.Provides
import com.twitter.decider.Decider
import com.twitter.finagle.memcached.{Client => MemcachedClient}
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.inject.TwitterModule
import com.twitter.representation_manager.StoreBuilder
import com.twitter.representation_manager.config.{
DefaultClientConfig => RepresentationManagerDefaultClientConfig
}
import com.twitter.representation_manager.thriftscala.SimClustersEmbeddingView
import com.twitter.simclusters_v2.common.SimClustersEmbedding
import com.twitter.simclusters_v2.stores.SimClustersEmbeddingStore
import com.twitter.simclusters_v2.thriftscala.EmbeddingType
import com.twitter.simclusters_v2.thriftscala.EmbeddingType._
import com.twitter.simclusters_v2.thriftscala.ModelVersion
import com.twitter.simclusters_v2.thriftscala.ModelVersion._
import com.twitter.simclusters_v2.thriftscala.SimClustersEmbeddingId
import com.twitter.storehaus.ReadableStore
import com.twitter.strato.client.{Client => StratoClient}
import javax.inject.Singleton
object EmbeddingStoreModule extends TwitterModule {
val TweetEmbeddings: Set[SimClustersEmbeddingView] = Set(
SimClustersEmbeddingView(LogFavLongestL2EmbeddingTweet, Model20m145kUpdated),
SimClustersEmbeddingView(LogFavLongestL2EmbeddingTweet, Model20m145k2020)
)
val UserEmbeddings: Set[SimClustersEmbeddingView] = Set(
// KnownFor
SimClustersEmbeddingView(FavBasedProducer, Model20m145kUpdated),
SimClustersEmbeddingView(FavBasedProducer, Model20m145k2020),
SimClustersEmbeddingView(FollowBasedProducer, Model20m145k2020),
SimClustersEmbeddingView(AggregatableLogFavBasedProducer, Model20m145k2020),
// InterestedIn
SimClustersEmbeddingView(UnfilteredUserInterestedIn, Model20m145k2020),
SimClustersEmbeddingView(
LogFavBasedUserInterestedMaxpoolingAddressBookFromIIAPE,
Model20m145k2020),
SimClustersEmbeddingView(
LogFavBasedUserInterestedAverageAddressBookFromIIAPE,
Model20m145k2020),
SimClustersEmbeddingView(
LogFavBasedUserInterestedBooktypeMaxpoolingAddressBookFromIIAPE,
Model20m145k2020),
SimClustersEmbeddingView(
LogFavBasedUserInterestedLargestDimMaxpoolingAddressBookFromIIAPE,
Model20m145k2020),
SimClustersEmbeddingView(
LogFavBasedUserInterestedLouvainMaxpoolingAddressBookFromIIAPE,
Model20m145k2020),
SimClustersEmbeddingView(
LogFavBasedUserInterestedConnectedMaxpoolingAddressBookFromIIAPE,
Model20m145k2020),
SimClustersEmbeddingView(UserNextInterestedIn, Model20m145k2020),
SimClustersEmbeddingView(LogFavBasedUserInterestedInFromAPE, Model20m145k2020)
)
@Singleton
@Provides
def providesEmbeddingStore(
stratoClient: StratoClient,
memCachedClient: MemcachedClient,
decider: Decider,
stats: StatsReceiver
): ReadableStore[SimClustersEmbeddingId, SimClustersEmbedding] = {
val rmsStoreBuilder = new StoreBuilder(
clientConfig = RepresentationManagerDefaultClientConfig,
stratoClient = stratoClient,
memCachedClient = memCachedClient,
globalStats = stats,
)
val underlyingStores: Map[
(EmbeddingType, ModelVersion),
ReadableStore[SimClustersEmbeddingId, SimClustersEmbedding]
] = {
val tweetEmbeddingStores: Map[
(EmbeddingType, ModelVersion),
ReadableStore[SimClustersEmbeddingId, SimClustersEmbedding]
] = TweetEmbeddings
.map(embeddingView =>
(
(embeddingView.embeddingType, embeddingView.modelVersion),
rmsStoreBuilder
.buildSimclustersTweetEmbeddingStoreWithEmbeddingIdAsKey(embeddingView))).toMap
val userEmbeddingStores: Map[
(EmbeddingType, ModelVersion),
ReadableStore[SimClustersEmbeddingId, SimClustersEmbedding]
] = UserEmbeddings
.map(embeddingView =>
(
(embeddingView.embeddingType, embeddingView.modelVersion),
rmsStoreBuilder
.buildSimclustersUserEmbeddingStoreWithEmbeddingIdAsKey(embeddingView))).toMap
tweetEmbeddingStores ++ userEmbeddingStores
}
SimClustersEmbeddingStore.buildWithDecider(
underlyingStores = underlyingStores,
decider = decider,
statsReceiver = stats
)
}
}

View File

@ -1,44 +0,0 @@
package com.twitter.simclustersann.modules
import com.twitter.inject.TwitterModule
import com.twitter.simclustersann.common.FlagNames
object FlagsModule extends TwitterModule {
flag[Int](
name = FlagNames.ServiceTimeout,
default = 40,
help = "The threshold of Request Timeout"
)
flag[String](
name = FlagNames.DarkTrafficFilterDeciderKey,
default = "dark_traffic_filter",
help = "Dark traffic filter decider key"
)
flag[String](
name = FlagNames.CacheDest,
default = "/s/cache/content_recommender_unified_v2",
help = "Path to memcache service. Currently using CR uniform scoring cache"
)
flag[Int](
name = FlagNames.CacheTimeout,
default = 15,
help = "The threshold of MemCache Timeout"
)
flag[Boolean](
name = FlagNames.CacheAsyncUpdate,
default = false,
help = "Whether to enable the async update for the MemCache"
)
flag[Int](
name = FlagNames.MaxTopTweetPerCluster,
default = 200,
help = "Maximum number of tweets to take per each simclusters"
)
}

View File

@ -1,27 +0,0 @@
package com.twitter.simclustersann.modules
import com.google.inject.Provides
import com.twitter.inject.TwitterModule
import com.twitter.inject.annotations.Flag
import com.twitter.simclustersann.common.FlagNames.NumberOfThreads
import com.twitter.util.ExecutorServiceFuturePool
import java.util.concurrent.Executors
import javax.inject.Singleton
object FuturePoolProvider extends TwitterModule {
flag[Int](
name = NumberOfThreads,
default = 20,
help = "The number of threads in the future pool."
)
@Singleton
@Provides
def providesFuturePool(
@Flag(NumberOfThreads) numberOfThreads: Int
): ExecutorServiceFuturePool = {
val threadPool = Executors.newFixedThreadPool(numberOfThreads)
new ExecutorServiceFuturePool(threadPool) {
override def toString: String = s"warmup-future-pool-$executor)"
}
}
}

View File

@ -1,23 +0,0 @@
package com.twitter.simclustersann.modules
import com.google.common.util.concurrent.RateLimiter
import com.google.inject.Provides
import com.twitter.inject.TwitterModule
import com.twitter.inject.annotations.Flag
import com.twitter.simclustersann.common.FlagNames.RateLimiterQPS
import javax.inject.Singleton
object RateLimiterModule extends TwitterModule {
flag[Int](
name = RateLimiterQPS,
default = 1000,
help = "The QPS allowed by the rate limiter."
)
@Singleton
@Provides
def providesRateLimiter(
@Flag(RateLimiterQPS) rateLimiterQps: Int
): RateLimiter =
RateLimiter.create(rateLimiterQps)
}

View File

@ -1,15 +0,0 @@
package com.twitter.simclustersann.modules
import com.google.inject.Provides
import com.twitter.inject.TwitterModule
import com.twitter.relevance_platform.simclustersann.multicluster.ServiceNameMapper
import javax.inject.Singleton
object ServiceNameMapperModule extends TwitterModule {
@Singleton
@Provides
def providesServiceNameMapper(
): ServiceNameMapper = {
ServiceNameMapper
}
}

View File

@ -1,47 +0,0 @@
package com.twitter.simclustersann.modules
import com.google.inject.Provides
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.inject.TwitterModule
import com.twitter.simclusters_v2.common.ClusterId
import com.twitter.simclusters_v2.common.SimClustersEmbedding
import com.twitter.simclusters_v2.common.TweetId
import com.twitter.simclusters_v2.thriftscala.SimClustersEmbeddingId
import com.twitter.storehaus.ReadableStore
import javax.inject.Singleton
import com.twitter.simclustersann.candidate_source.ApproximateCosineSimilarity
import com.twitter.simclustersann.candidate_source.ExperimentalApproximateCosineSimilarity
import com.twitter.simclustersann.candidate_source.OptimizedApproximateCosineSimilarity
import com.twitter.simclustersann.candidate_source.SimClustersANNCandidateSource
object SimClustersANNCandidateSourceModule extends TwitterModule {
val acsFlag = flag[String](
name = "approximate_cosine_similarity",
default = "original",
help =
"Select different implementations of the approximate cosine similarity algorithm, for testing optimizations",
)
@Singleton
@Provides
def provides(
embeddingStore: ReadableStore[SimClustersEmbeddingId, SimClustersEmbedding],
cachedClusterTweetIndexStore: ReadableStore[ClusterId, Seq[(TweetId, Double)]],
statsReceiver: StatsReceiver
): SimClustersANNCandidateSource = {
val approximateCosineSimilarity = acsFlag() match {
case "original" => ApproximateCosineSimilarity
case "optimized" => OptimizedApproximateCosineSimilarity
case "experimental" => ExperimentalApproximateCosineSimilarity
case _ => ApproximateCosineSimilarity
}
new SimClustersANNCandidateSource(
approximateCosineSimilarity = approximateCosineSimilarity,
clusterTweetCandidatesStore = cachedClusterTweetIndexStore,
simClustersEmbeddingStore = embeddingStore,
statsReceiver = statsReceiver.scope("simClustersANNCandidateSource")
)
}
}

View File

@ -1,20 +0,0 @@
package com.twitter.simclustersann.modules
import com.google.inject.Provides
import javax.inject.Singleton
import com.twitter.inject.TwitterModule
import com.twitter.finagle.mtls.authentication.ServiceIdentifier
import com.twitter.strato.client.Client
import com.twitter.strato.client.Strato
object StratoClientProviderModule extends TwitterModule {
@Singleton
@Provides
def providesCache(
serviceIdentifier: ServiceIdentifier,
): Client = Strato.client
.withMutualTls(serviceIdentifier)
.build()
}

View File

@ -1,16 +0,0 @@
create_thrift_libraries(
base_name = "thrift",
sources = ["**/*.thrift"],
platform = "java8",
tags = ["bazel-compatible"],
dependency_roots = [
"finatra-internal/thrift/src/main/thrift",
"src/thrift/com/twitter/simclusters_v2:simclusters_v2-thrift",
],
generate_languages = [
"java",
"scala",
],
provides_java_name = "simclusters-ann-thrift-java",
provides_scala_name = "simclusters-ann-thrift-scala",
)

View File

@ -1,59 +0,0 @@
namespace java com.twitter.simclustersann.thriftjava
#@namespace scala com.twitter.simclustersann.thriftscala
include "finatra-thrift/finatra_thrift_exceptions.thrift"
include "com/twitter/simclusters_v2/identifier.thrift"
include "com/twitter/simclusters_v2/score.thrift"
struct Query {
1: required identifier.SimClustersEmbeddingId sourceEmbeddingId;
2: required SimClustersANNConfig config;
}
struct SimClustersANNTweetCandidate {
1: required i64 tweetId (personalDataType = 'TweetId');
2: required double score;
}
struct SimClustersANNConfig {
1: required i32 maxNumResults;
2: required double minScore;
3: required identifier.EmbeddingType candidateEmbeddingType;
4: required i32 maxTopTweetsPerCluster;
5: required i32 maxScanClusters;
6: required i32 maxTweetCandidateAgeHours;
7: required i32 minTweetCandidateAgeHours;
8: required ScoringAlgorithm annAlgorithm;
}
/**
* The algorithm type to identify the score algorithm.
**/
enum ScoringAlgorithm {
DotProduct = 1,
CosineSimilarity = 2,
LogCosineSimilarity = 3,
CosineSimilarityNoSourceEmbeddingNormalization = 4, // Score = (Source dot Candidate) / candidate_l2_norm
}(hasPersonalData = 'false')
enum InvalidResponseParameter {
INVALID_EMBEDDING_TYPE = 1,
INVALID_MODEL_VERSION = 2,
}
exception InvalidResponseParameterException {
1: required InvalidResponseParameter errorCode,
2: optional string message // failure reason
}
service SimClustersANNService {
list<SimClustersANNTweetCandidate> getTweetCandidates(
1: required Query query;
) throws (
1: InvalidResponseParameterException e;
2: finatra_thrift_exceptions.ServerError serverError;
3: finatra_thrift_exceptions.ClientError clientError;
);
}