[docx] split commit for file 1000

Signed-off-by: Ari Archer <ari.web.xyz@gmail.com>
This commit is contained in:
Ari Archer 2024-01-23 19:06:09 +02:00
parent eb3e2f9bcf
commit 7ed90fef05
No known key found for this signature in database
GPG Key ID: A50D5B4B599AF8A2
400 changed files with 0 additions and 10820 deletions

View File

@ -1,20 +0,0 @@
scala_library(
sources = ["*.scala"],
compiler_option_sets = ["fatal_warnings"],
platform = "java8",
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/com/google/inject:guice",
"3rdparty/jvm/com/google/inject/extensions:guice-assistedinject",
"3rdparty/jvm/net/codingwell:scala-guice",
"3rdparty/jvm/org/slf4j:slf4j-api",
"cortex-deepbird/prediction/src/main/scala/com/twitter/cortex/deepbird/prediction",
"cortex-deepbird/thrift/src/main/thrift:thrift-java",
"finatra-internal/mtls-thriftmux/src/main/scala",
"finatra/inject/inject-core/src/main/scala",
"finatra/inject/inject-thrift-client/src/main/scala",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/clients/common",
"src/scala/com/twitter/ml/api/util",
"util/util-core:scala",
],
)

View File

@ -1,67 +0,0 @@
package com.twitter.follow_recommendations.common.clients.deepbirdv2
import com.google.inject.Provides
import com.google.inject.name.Named
import com.twitter.bijection.scrooge.TBinaryProtocol
import com.twitter.conversions.DurationOps._
import com.twitter.cortex.deepbird.thriftjava.DeepbirdPredictionService
import com.twitter.finagle.ThriftMux
import com.twitter.finagle.builder.ClientBuilder
import com.twitter.finagle.mtls.authentication.ServiceIdentifier
import com.twitter.finagle.mtls.client.MtlsStackClient._
import com.twitter.finagle.stats.NullStatsReceiver
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.thrift.ClientId
import com.twitter.finagle.thrift.RichClientParam
import com.twitter.follow_recommendations.common.constants.GuiceNamedConstants
import com.twitter.inject.TwitterModule
/**
* Module that provides multiple deepbirdv2 prediction service clients
* We use the java api since data records are native java objects and we want to reduce overhead
* while serializing/deserializing data.
*/
object DeepBirdV2PredictionServiceClientModule extends TwitterModule {
val RequestTimeout = 300.millis
private def getDeepbirdPredictionServiceClient(
clientId: ClientId,
label: String,
dest: String,
statsReceiver: StatsReceiver,
serviceIdentifier: ServiceIdentifier
): DeepbirdPredictionService.ServiceToClient = {
val clientStatsReceiver = statsReceiver.scope("clnt")
val mTlsClient = ThriftMux.client.withClientId(clientId).withMutualTls(serviceIdentifier)
new DeepbirdPredictionService.ServiceToClient(
ClientBuilder()
.name(label)
.stack(mTlsClient)
.dest(dest)
.requestTimeout(RequestTimeout)
.reportHostStats(NullStatsReceiver)
.build(),
RichClientParam(
new TBinaryProtocol.Factory(),
clientStats = clientStatsReceiver
)
)
}
@Provides
@Named(GuiceNamedConstants.WTF_PROD_DEEPBIRDV2_CLIENT)
def providesWtfProdDeepbirdV2PredictionService(
clientId: ClientId,
statsReceiver: StatsReceiver,
serviceIdentifier: ServiceIdentifier
): DeepbirdPredictionService.ServiceToClient = {
getDeepbirdPredictionServiceClient(
clientId = clientId,
label = "WtfProdDeepbirdV2PredictionService",
dest = "/s/cassowary/deepbirdv2-hermit-wtf",
statsReceiver = statsReceiver,
serviceIdentifier = serviceIdentifier
)
}
}

View File

@ -1,19 +0,0 @@
scala_library(
sources = ["*.scala"],
compiler_option_sets = ["fatal_warnings"],
platform = "java8",
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/com/github/nscala_time",
"3rdparty/jvm/com/google/inject:guice",
"3rdparty/jvm/com/google/inject/extensions:guice-assistedinject",
"3rdparty/jvm/net/codingwell:scala-guice",
"3rdparty/jvm/org/slf4j:slf4j-api",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/clients/strato",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/constants",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/models",
"follow-recommendations-service/thrift/src/main/thrift:thrift-scala",
"src/thrift/com/twitter/onboarding/relevance/store:store-scala",
"util/util-core:scala",
],
)

View File

@ -1,60 +0,0 @@
package com.twitter.follow_recommendations.common.clients.dismiss_store
import com.twitter.follow_recommendations.common.constants.GuiceNamedConstants
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.onboarding.relevance.store.thriftscala.WhoToFollowDismissEventDetails
import com.twitter.stitch.Stitch
import com.twitter.strato.catalog.Scan.Slice
import com.twitter.strato.client.Scanner
import com.twitter.util.logging.Logging
import javax.inject.Inject
import javax.inject.Named
import javax.inject.Singleton
/**
* this store gets the list of dismissed candidates since a certain time
* primarily used for filtering out accounts that a user has explicitly dismissed
*
* we fail open on timeouts, but loudly on other errors
*/
@Singleton
class DismissStore @Inject() (
@Named(GuiceNamedConstants.DISMISS_STORE_SCANNER)
scanner: Scanner[(Long, Slice[
(Long, Long)
]), Unit, (Long, (Long, Long)), WhoToFollowDismissEventDetails],
stats: StatsReceiver)
extends Logging {
private val MaxCandidatesToReturn = 100
// gets a list of dismissed candidates. if numCandidatesToFetchOption is none, we will fetch the default number of candidates
def get(
userId: Long,
negStartTimeMs: Long,
maxCandidatesToFetchOption: Option[Int]
): Stitch[Seq[Long]] = {
val maxCandidatesToFetch = maxCandidatesToFetchOption.getOrElse(MaxCandidatesToReturn)
scanner
.scan(
(
userId,
Slice(
from = None,
to = Some((negStartTimeMs, Long.MaxValue)),
limit = Some(maxCandidatesToFetch)
)
)
)
.map {
case s: Seq[((Long, (Long, Long)), WhoToFollowDismissEventDetails)] if s.nonEmpty =>
s.map {
case ((_: Long, (_: Long, candidateId: Long)), _: WhoToFollowDismissEventDetails) =>
candidateId
}
case _ => Nil
}
}
}

View File

@ -1,14 +0,0 @@
scala_library(
sources = ["*.scala"],
compiler_option_sets = ["fatal_warnings"],
platform = "java8",
tags = ["bazel-compatible"],
dependencies = [
"emailstorage/server/src/main/thrift/com/twitter/emailstorage/api:email-storage-service-scala",
"finatra-internal/mtls-thriftmux/src/main/scala",
"finatra/inject/inject-thrift-client",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/clients/common",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/models",
"stitch/stitch-core",
],
)

View File

@ -1,28 +0,0 @@
package com.twitter.follow_recommendations.common.clients.email_storage_service
import com.twitter.cds.contact_consent_state.thriftscala.PurposeOfProcessing
import com.twitter.emailstorage.api.thriftscala.EmailStorageService
import com.twitter.emailstorage.api.thriftscala.GetUsersEmailsRequest
import com.twitter.stitch.Stitch
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
class EmailStorageServiceClient @Inject() (
val emailStorageService: EmailStorageService.MethodPerEndpoint) {
def getVerifiedEmail(
userId: Long,
purposeOfProcessing: PurposeOfProcessing
): Stitch[Option[String]] = {
val req = GetUsersEmailsRequest(
userIds = Seq(userId),
clientIdentifier = Some("follow-recommendations-service"),
purposesOfProcessing = Some(Seq(purposeOfProcessing))
)
Stitch.callFuture(emailStorageService.getUsersEmails(req)) map {
_.usersEmails.map(_.confirmedEmail.map(_.email)).head
}
}
}

View File

@ -1,12 +0,0 @@
package com.twitter.follow_recommendations.common.clients.email_storage_service
import com.twitter.emailstorage.api.thriftscala.EmailStorageService
import com.twitter.finatra.mtls.thriftmux.modules.MtlsClient
import com.twitter.follow_recommendations.common.clients.common.BaseClientModule
object EmailStorageServiceModule
extends BaseClientModule[EmailStorageService.MethodPerEndpoint]
with MtlsClient {
override val label = "email-storage-service"
override val dest = "/s/email-server/email-server"
}

View File

@ -1,22 +0,0 @@
scala_library(
sources = ["*.scala"],
compiler_option_sets = ["fatal_warnings"],
platform = "java8",
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/com/github/nscala_time",
"3rdparty/jvm/com/google/inject:guice",
"3rdparty/jvm/com/google/inject/extensions:guice-assistedinject",
"3rdparty/jvm/net/codingwell:scala-guice",
"3rdparty/jvm/org/slf4j:slf4j-api",
"finatra-internal/mtls-thriftmux/src/main/scala",
"finatra/inject/inject-core/src/main/scala",
"finatra/inject/inject-thrift-client/src/main/scala",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/clients/common",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/models",
"src/thrift/com/twitter/geoduck:geoduck-scala",
"src/thrift/com/twitter/geoduck:geoduckpartnerplaces-thrift-scala",
"stitch/stitch-core",
"util/util-core:scala",
],
)

View File

@ -1,62 +0,0 @@
package com.twitter.follow_recommendations.common.clients.geoduck
import com.twitter.follow_recommendations.common.models.GeohashAndCountryCode
import com.twitter.geoduck.common.thriftscala.LocationSource
import com.twitter.geoduck.common.thriftscala.PlaceQuery
import com.twitter.geoduck.common.thriftscala.TransactionLocation
import com.twitter.geoduck.common.thriftscala.UserLocationRequest
import com.twitter.geoduck.thriftscala.LocationService
import com.twitter.stitch.Stitch
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
class LocationServiceClient @Inject() (locationService: LocationService.MethodPerEndpoint) {
def getGeohashAndCountryCode(userId: Long): Stitch[GeohashAndCountryCode] = {
Stitch
.callFuture {
locationService
.userLocation(
UserLocationRequest(
Seq(userId),
Some(PlaceQuery(allPlaceTypes = Some(true))),
simpleReverseGeocode = true))
.map(_.found.get(userId)).map { transactionLocationOpt =>
val geohashOpt = transactionLocationOpt.flatMap(getGeohashFromTransactionLocation)
val countryCodeOpt =
transactionLocationOpt.flatMap(_.simpleRgcResult.flatMap(_.countryCodeAlpha2))
GeohashAndCountryCode(geohashOpt, countryCodeOpt)
}
}
}
private[this] def getGeohashFromTransactionLocation(
transactionLocation: TransactionLocation
): Option[String] = {
transactionLocation.geohash.flatMap { geohash =>
val geohashPrefixLength = transactionLocation.locationSource match {
// if location source is logical, keep the first 4 chars in geohash
case Some(LocationSource.Logical) => Some(4)
// if location source is physical, keep the prefix according to accuracy
// accuracy is the accuracy of GPS readings in the unit of meter
case Some(LocationSource.Physical) =>
transactionLocation.coordinate.flatMap { coordinate =>
coordinate.accuracy match {
case Some(accuracy) if (accuracy < 50) => Some(7)
case Some(accuracy) if (accuracy < 200) => Some(6)
case Some(accuracy) if (accuracy < 1000) => Some(5)
case Some(accuracy) if (accuracy < 50000) => Some(4)
case Some(accuracy) if (accuracy < 100000) => Some(3)
case _ => None
}
}
case Some(LocationSource.Model) => Some(4)
case _ => None
}
geohashPrefixLength match {
case Some(l: Int) => geohash.stringGeohash.map(_.take(l))
case _ => None
}
}
}
}

View File

@ -1,12 +0,0 @@
package com.twitter.follow_recommendations.common.clients.geoduck
import com.twitter.finatra.mtls.thriftmux.modules.MtlsClient
import com.twitter.follow_recommendations.common.clients.common.BaseClientModule
import com.twitter.geoduck.thriftscala.LocationService
object LocationServiceModule
extends BaseClientModule[LocationService.MethodPerEndpoint]
with MtlsClient {
override val label = "geoduck_locationservice"
override val dest = "/s/geo/geoduck_locationservice"
}

View File

@ -1,57 +0,0 @@
package com.twitter.follow_recommendations.common.clients.geoduck
import com.twitter.follow_recommendations.common.models.GeohashAndCountryCode
import com.twitter.geoduck.common.thriftscala.Location
import com.twitter.geoduck.common.thriftscala.PlaceQuery
import com.twitter.geoduck.common.thriftscala.ReverseGeocodeIPRequest
import com.twitter.geoduck.service.thriftscala.GeoContext
import com.twitter.geoduck.thriftscala.ReverseGeocoder
import com.twitter.stitch.Stitch
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
class ReverseGeocodeClient @Inject() (rgcService: ReverseGeocoder.MethodPerEndpoint) {
def getGeohashAndCountryCode(ipAddress: String): Stitch[GeohashAndCountryCode] = {
Stitch
.callFuture {
rgcService
.reverseGeocodeIp(
ReverseGeocodeIPRequest(
Seq(ipAddress),
PlaceQuery(None),
simpleReverseGeocode = true
) // note: simpleReverseGeocode means that country code will be included in response
).map { response =>
response.found.get(ipAddress) match {
case Some(location) => getGeohashAndCountryCodeFromLocation(location)
case _ => GeohashAndCountryCode(None, None)
}
}
}
}
private def getGeohashAndCountryCodeFromLocation(location: Location): GeohashAndCountryCode = {
val countryCode: Option[String] = location.simpleRgcResult.flatMap { _.countryCodeAlpha2 }
val geohashString: Option[String] = location.geohash.flatMap { hash =>
hash.stringGeohash.flatMap { hashString =>
Some(ReverseGeocodeClient.truncate(hashString))
}
}
GeohashAndCountryCode(geohashString, countryCode)
}
}
object ReverseGeocodeClient {
val DefaultGeoduckIPRequestContext: GeoContext =
GeoContext(allPlaceTypes = true, includeGeohash = true, includeCountryCode = true)
// All these geohashes are guessed by IP (Logical Location Source).
// So take the four letters to make sure it is consistent with LocationServiceClient
val GeohashLengthAfterTruncation = 4
def truncate(geohash: String): String = geohash.take(GeohashLengthAfterTruncation)
}

View File

@ -1,59 +0,0 @@
package com.twitter.follow_recommendations.common.clients.geoduck
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.follow_recommendations.common.models.GeohashAndCountryCode
import com.twitter.stitch.Stitch
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
class UserLocationFetcher @Inject() (
locationServiceClient: LocationServiceClient,
reverseGeocodeClient: ReverseGeocodeClient,
statsReceiver: StatsReceiver) {
private val stats: StatsReceiver = statsReceiver.scope("user_location_fetcher")
private val totalRequestsCounter = stats.counter("requests")
private val emptyResponsesCounter = stats.counter("empty")
private val locationServiceExceptionCounter = stats.counter("location_service_exception")
private val reverseGeocodeExceptionCounter = stats.counter("reverse_geocode_exception")
def getGeohashAndCountryCode(
userId: Option[Long],
ipAddress: Option[String]
): Stitch[Option[GeohashAndCountryCode]] = {
totalRequestsCounter.incr()
val lscLocationStitch = Stitch
.collect {
userId.map(locationServiceClient.getGeohashAndCountryCode)
}.rescue {
case _: Exception =>
locationServiceExceptionCounter.incr()
Stitch.None
}
val ipLocationStitch = Stitch
.collect {
ipAddress.map(reverseGeocodeClient.getGeohashAndCountryCode)
}.rescue {
case _: Exception =>
reverseGeocodeExceptionCounter.incr()
Stitch.None
}
Stitch.join(lscLocationStitch, ipLocationStitch).map {
case (lscLocation, ipLocation) => {
val geohash = lscLocation.flatMap(_.geohash).orElse(ipLocation.flatMap(_.geohash))
val countryCode =
lscLocation.flatMap(_.countryCode).orElse(ipLocation.flatMap(_.countryCode))
(geohash, countryCode) match {
case (None, None) =>
emptyResponsesCounter.incr()
None
case _ => Some(GeohashAndCountryCode(geohash, countryCode))
}
}
}
}
}

View File

@ -1,21 +0,0 @@
scala_library(
sources = ["*.scala"],
compiler_option_sets = ["fatal_warnings"],
platform = "java8",
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/com/github/nscala_time",
"3rdparty/jvm/com/google/inject:guice",
"3rdparty/jvm/com/google/inject/extensions:guice-assistedinject",
"3rdparty/jvm/net/codingwell:scala-guice",
"3rdparty/jvm/org/slf4j:slf4j-api",
"finatra-internal/mtls-thriftmux/src/main/scala",
"finatra/inject/inject-core/src/main/scala",
"finatra/inject/inject-thrift-client/src/main/scala",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/base",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/clients/common",
"src/thrift/com/twitter/gizmoduck:thrift-scala",
"stitch/stitch-gizmoduck",
"util/util-core:scala",
],
)

View File

@ -1,81 +0,0 @@
package com.twitter.follow_recommendations.common.clients.gizmoduck
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.follow_recommendations.common.base.StatsUtil
import com.twitter.gizmoduck.thriftscala.LookupContext
import com.twitter.gizmoduck.thriftscala.PerspectiveEdge
import com.twitter.gizmoduck.thriftscala.QueryFields
import com.twitter.stitch.Stitch
import com.twitter.stitch.gizmoduck.Gizmoduck
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
class GizmoduckClient @Inject() (gizmoduckStitchClient: Gizmoduck, statsReceiver: StatsReceiver) {
val stats = statsReceiver.scope("gizmoduck_client")
val getByIdStats = stats.scope("get_by_id")
val getUserById = stats.scope("get_user_by_id")
def isProtected(userId: Long): Stitch[Boolean] = {
// get latency metrics with StatsUtil.profileStitch when calling .getById
val response = StatsUtil.profileStitch(
gizmoduckStitchClient.getById(userId, Set(QueryFields.Safety)),
getByIdStats
)
response.map { result =>
result.user.flatMap(_.safety).map(_.isProtected).getOrElse(true)
}
}
def getUserName(userId: Long, forUserId: Long): Stitch[Option[String]] = {
val queryFields = GizmoduckClient.GetUserByIdUserNameQueryFields
val lookupContext = LookupContext(
forUserId = Some(forUserId),
perspectiveEdges = Some(GizmoduckClient.DefaultPerspectiveEdges)
)
// get latency metrics with StatsUtil.profileStitch when calling .getUserById
val response = StatsUtil.profileStitch(
gizmoduckStitchClient.getUserById(userId, queryFields, lookupContext),
getUserById
)
response.map(_.profile.map(_.name))
}
}
object GizmoduckClient {
// Similar to GizmoduckUserRepository.DefaultPerspectiveEdges
val DefaultPerspectiveEdges: Set[PerspectiveEdge] =
Set(
PerspectiveEdge.Blocking,
PerspectiveEdge.BlockedBy,
PerspectiveEdge.DeviceFollowing,
PerspectiveEdge.FollowRequestSent,
PerspectiveEdge.Following,
PerspectiveEdge.FollowedBy,
PerspectiveEdge.LifelineFollowing,
PerspectiveEdge.LifelineFollowedBy,
PerspectiveEdge.Muting,
PerspectiveEdge.NoRetweetsFrom
)
// From GizmoduckUserRepository.DefaultQueryFields
val GetUserByIdQueryFields: Set[QueryFields] = Set(
QueryFields.Account,
QueryFields.Counts,
QueryFields.ExtendedProfile,
QueryFields.Perspective,
QueryFields.Profile,
QueryFields.ProfileDesign,
QueryFields.ProfileLocation,
QueryFields.Safety,
QueryFields.Roles,
QueryFields.Takedowns,
QueryFields.UrlEntities,
QueryFields.DirectMessageView,
QueryFields.MediaView
)
val GetUserByIdUserNameQueryFields: Set[QueryFields] = Set(
QueryFields.Profile
)
}

View File

@ -1,24 +0,0 @@
package com.twitter.follow_recommendations.common.clients.gizmoduck
import com.google.inject.Provides
import com.twitter.finatra.mtls.thriftmux.modules.MtlsClient
import com.twitter.follow_recommendations.common.clients.common.BaseClientModule
import com.twitter.gizmoduck.thriftscala.QueryFields
import com.twitter.gizmoduck.thriftscala.UserService
import com.twitter.stitch.gizmoduck.Gizmoduck
import javax.inject.Singleton
object GizmoduckModule extends BaseClientModule[UserService.MethodPerEndpoint] with MtlsClient {
override val label = "gizmoduck"
override val dest = "/s/gizmoduck/gizmoduck"
@Provides
@Singleton
def provideExtraGizmoduckQueryFields: Set[QueryFields] = Set.empty
@Provides
@Singleton
def providesStitchClient(futureIface: UserService.MethodPerEndpoint): Gizmoduck = {
Gizmoduck(futureIface)
}
}

View File

@ -1,14 +0,0 @@
scala_library(
sources = ["*.scala"],
compiler_option_sets = ["fatal_warnings"],
platform = "java8",
tags = ["bazel-compatible"],
dependencies = [
"finatra-internal/mtls-thriftmux/src/main/scala",
"finatra/inject/inject-thrift-client",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/clients/common",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/models",
"graph-feature-service/src/main/thrift/com/twitter/graph_feature_service:graph_feature_service_thrift-scala",
"stitch/stitch-core",
],
)

View File

@ -1,50 +0,0 @@
package com.twitter.follow_recommendations.common.clients.graph_feature_service
import com.twitter.follow_recommendations.common.models.FollowProof
import com.twitter.graph_feature_service.thriftscala.PresetFeatureTypes.WtfTwoHop
import com.twitter.graph_feature_service.thriftscala.EdgeType
import com.twitter.graph_feature_service.thriftscala.GfsIntersectionResponse
import com.twitter.graph_feature_service.thriftscala.GfsPresetIntersectionRequest
import com.twitter.graph_feature_service.thriftscala.{Server => GraphFeatureService}
import com.twitter.stitch.Stitch
import javax.inject.{Inject, Singleton}
@Singleton
class GraphFeatureServiceClient @Inject() (
graphFeatureService: GraphFeatureService.MethodPerEndpoint) {
import GraphFeatureServiceClient._
def getIntersections(
userId: Long,
candidateIds: Seq[Long],
numIntersectionIds: Int
): Stitch[Map[Long, FollowProof]] = {
Stitch
.callFuture(
graphFeatureService.getPresetIntersection(
GfsPresetIntersectionRequest(userId, candidateIds, WtfTwoHop, Some(numIntersectionIds))
)
).map {
case GfsIntersectionResponse(gfsIntersectionResults) =>
(for {
candidateId <- candidateIds
gfsIntersectionResultForCandidate =
gfsIntersectionResults.filter(_.candidateUserId == candidateId)
followProof <- for {
result <- gfsIntersectionResultForCandidate
intersection <- result.intersectionValues
if leftEdgeTypes.contains(intersection.featureType.leftEdgeType)
if rightEdgeTypes.contains(intersection.featureType.rightEdgeType)
intersectionIds <- intersection.intersectionIds.toSeq
} yield FollowProof(intersectionIds, intersection.count.getOrElse(0))
} yield {
candidateId -> followProof
}).toMap
}
}
}
object GraphFeatureServiceClient {
val leftEdgeTypes: Set[EdgeType] = Set(EdgeType.Following)
val rightEdgeTypes: Set[EdgeType] = Set(EdgeType.FollowedBy)
}

View File

@ -1,12 +0,0 @@
package com.twitter.follow_recommendations.common.clients.graph_feature_service
import com.twitter.finatra.mtls.thriftmux.modules.MtlsClient
import com.twitter.follow_recommendations.common.clients.common.BaseClientModule
import com.twitter.graph_feature_service.thriftscala.{Server => GraphFeatureService}
object GraphFeatureStoreModule
extends BaseClientModule[GraphFeatureService.MethodPerEndpoint]
with MtlsClient {
override val label = "graph_feature_service"
override val dest = "/s/cassowary/graph_feature_service-server"
}

View File

@ -1,18 +0,0 @@
scala_library(
sources = ["*.scala"],
compiler_option_sets = ["fatal_warnings"],
platform = "java8",
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/com/github/nscala_time",
"3rdparty/jvm/com/google/inject:guice",
"3rdparty/jvm/com/google/inject/extensions:guice-assistedinject",
"3rdparty/jvm/net/codingwell:scala-guice",
"3rdparty/jvm/org/slf4j:slf4j-api",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/clients/strato",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/models",
"follow-recommendations-service/thrift/src/main/thrift:thrift-scala",
"stitch/stitch-socialgraph",
"util/util-core:scala",
],
)

View File

@ -1,31 +0,0 @@
package com.twitter.follow_recommendations.common.clients.impression_store
import com.google.inject.Provides
import com.google.inject.Singleton
import com.twitter.follow_recommendations.thriftscala.DisplayLocation
import com.twitter.inject.TwitterModule
import com.twitter.strato.catalog.Scan.Slice
import com.twitter.strato.client.Client
import com.twitter.strato.thrift.ScroogeConvImplicits._
object ImpressionStoreModule extends TwitterModule {
val columnPath: String = "onboarding/userrecs/wtfImpressionCountsStore"
type PKey = (Long, DisplayLocation)
type LKey = Long
type Value = (Long, Int)
@Provides
@Singleton
def providesImpressionStore(stratoClient: Client): WtfImpressionStore = {
new WtfImpressionStore(
stratoClient.scanner[
(PKey, Slice[LKey]),
Unit,
(PKey, LKey),
Value
](columnPath)
)
}
}

View File

@ -1,42 +0,0 @@
package com.twitter.follow_recommendations.common.clients.impression_store
import com.twitter.follow_recommendations.common.models.DisplayLocation
import com.twitter.follow_recommendations.common.models.WtfImpression
import com.twitter.follow_recommendations.thriftscala.{DisplayLocation => TDisplayLocation}
import com.twitter.stitch.Stitch
import com.twitter.strato.catalog.Scan.Slice
import com.twitter.strato.client.Scanner
import com.twitter.util.Time
import com.twitter.util.logging.Logging
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
class WtfImpressionStore @Inject() (
scanner: Scanner[
((Long, TDisplayLocation), Slice[Long]),
Unit,
((Long, TDisplayLocation), Long),
(Long, Int)
]) extends Logging {
def get(userId: Long, dl: DisplayLocation): Stitch[Seq[WtfImpression]] = {
val thriftDl = dl.toThrift
scanner.scan(((userId, thriftDl), Slice.all[Long])).map { impressionsPerDl =>
val wtfImpressions =
for {
(((_, _), candidateId), (latestTs, counts)) <- impressionsPerDl
} yield WtfImpression(
candidateId = candidateId,
displayLocation = dl,
latestTime = Time.fromMilliseconds(latestTs),
counts = counts
)
wtfImpressions
} rescue {
// fail open so that the request can still go through
case ex: Throwable =>
logger.warn(s"$dl WtfImpressionsStore warn: " + ex.getMessage)
Stitch.Nil
}
}
}

View File

@ -1,14 +0,0 @@
scala_library(
name = "interests_service",
sources = ["InterestServiceClient.scala"],
platform = "java8",
tags = ["bazel-compatible"],
dependencies = [
"frigate/frigate-common/src/main/scala/com/twitter/frigate/common/store/interests",
"interests-service/thrift/src/main/thrift:thrift-scala",
"strato/src/main/scala/com/twitter/strato/catalog",
"strato/src/main/scala/com/twitter/strato/client",
"strato/src/main/scala/com/twitter/strato/data",
"strato/src/main/scala/com/twitter/strato/thrift",
],
)

View File

@ -1,115 +0,0 @@
package com.twitter.follow_recommendations.common.clients.interests_service
import com.google.inject.Inject
import com.google.inject.Singleton
import com.twitter.finagle.stats.NullStatsReceiver
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.frigate.common.store.InterestedInInterestsFetchKey
import com.twitter.inject.Logging
import com.twitter.interests.thriftscala.InterestId
import com.twitter.interests.thriftscala.InterestRelationship
import com.twitter.interests.thriftscala.InterestedInInterestModel
import com.twitter.interests.thriftscala.UserInterest
import com.twitter.interests.thriftscala.UserInterestData
import com.twitter.interests.thriftscala.UserInterestsResponse
import com.twitter.stitch.Stitch
import com.twitter.strato.client.Client
import com.twitter.strato.thrift.ScroogeConvImplicits._
@Singleton
class InterestServiceClient @Inject() (
stratoClient: Client,
statsReceiver: StatsReceiver = NullStatsReceiver)
extends Logging {
val interestsServiceStratoColumnPath = "interests/interestedInInterests"
val stats = statsReceiver.scope("interest_service_client")
val errorCounter = stats.counter("error")
private val interestsFetcher =
stratoClient.fetcher[InterestedInInterestsFetchKey, UserInterestsResponse](
interestsServiceStratoColumnPath,
checkTypes = true
)
def fetchUttInterestIds(
userId: Long
): Stitch[Seq[Long]] = {
fetchInterestRelationships(userId)
.map(_.toSeq.flatten.flatMap(extractUttInterest))
}
def extractUttInterest(
interestRelationShip: InterestRelationship
): Option[Long] = {
interestRelationShip match {
case InterestRelationship.V1(relationshipV1) =>
relationshipV1.interestId match {
case InterestId.SemanticCore(semanticCoreInterest) => Some(semanticCoreInterest.id)
case _ => None
}
case _ => None
}
}
def fetchCustomInterests(
userId: Long
): Stitch[Seq[String]] = {
fetchInterestRelationships(userId)
.map(_.toSeq.flatten.flatMap(extractCustomInterest))
}
def extractCustomInterest(
interestRelationShip: InterestRelationship
): Option[String] = {
interestRelationShip match {
case InterestRelationship.V1(relationshipV1) =>
relationshipV1.interestId match {
case InterestId.FreeForm(freeFormInterest) => Some(freeFormInterest.interest)
case _ => None
}
case _ => None
}
}
def fetchInterestRelationships(
userId: Long
): Stitch[Option[Seq[InterestRelationship]]] = {
interestsFetcher
.fetch(
InterestedInInterestsFetchKey(
userId = userId,
labels = None,
None
))
.map(_.v)
.map {
case Some(response) =>
response.interests.interests.map { interests =>
interests.collect {
case UserInterest(_, Some(interestData)) =>
getInterestRelationship(interestData)
}.flatten
}
case _ => None
}
.rescue {
case e: Throwable => // we are swallowing all errors
logger.warn(s"interests could not be retrieved for user $userId due to ${e.getCause}")
errorCounter.incr
Stitch.None
}
}
private def getInterestRelationship(
interestData: UserInterestData
): Seq[InterestRelationship] = {
interestData match {
case UserInterestData.InterestedIn(interestModels) =>
interestModels.collect {
case InterestedInInterestModel.ExplicitModel(model) => model
}
case _ => Nil
}
}
}

View File

@ -1,14 +0,0 @@
scala_library(
sources = ["*.scala"],
compiler_option_sets = ["fatal_warnings"],
platform = "java8",
tags = ["bazel-compatible"],
dependencies = [
"finatra-internal/mtls-thriftmux/src/main/scala",
"finatra/inject/inject-thrift-client",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/clients/common",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/models",
"phonestorage/server/src/main/thrift/com/twitter/phonestorage/api:phone-storage-service-scala",
"stitch/stitch-core",
],
)

View File

@ -1,34 +0,0 @@
package com.twitter.follow_recommendations.common.clients.phone_storage_service
import com.twitter.cds.contact_consent_state.thriftscala.PurposeOfProcessing
import com.twitter.phonestorage.api.thriftscala.GetUserPhonesByUsersRequest
import com.twitter.phonestorage.api.thriftscala.PhoneStorageService
import com.twitter.stitch.Stitch
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
class PhoneStorageServiceClient @Inject() (
val phoneStorageService: PhoneStorageService.MethodPerEndpoint) {
/**
* PSS can potentially return multiple phone records.
* The current implementation of getUserPhonesByUsers returns only a single phone for a single user_id but
* we can trivially support handling multiple in case that changes in the future.
*/
def getPhoneNumbers(
userId: Long,
purposeOfProcessing: PurposeOfProcessing,
forceCarrierLookup: Option[Boolean] = None
): Stitch[Seq[String]] = {
val req = GetUserPhonesByUsersRequest(
userIds = Seq(userId),
forceCarrierLookup = forceCarrierLookup,
purposesOfProcessing = Some(Seq(purposeOfProcessing))
)
Stitch.callFuture(phoneStorageService.getUserPhonesByUsers(req)) map {
_.userPhones.map(_.phoneNumber)
}
}
}

View File

@ -1,12 +0,0 @@
package com.twitter.follow_recommendations.common.clients.phone_storage_service
import com.twitter.finatra.mtls.thriftmux.modules.MtlsClient
import com.twitter.follow_recommendations.common.clients.common.BaseClientModule
import com.twitter.phonestorage.api.thriftscala.PhoneStorageService
object PhoneStorageServiceModule
extends BaseClientModule[PhoneStorageService.MethodPerEndpoint]
with MtlsClient {
override val label = "phone-storage-service"
override val dest = "/s/ibis-ds-api/ibis-ds-api:thrift2"
}

View File

@ -1,20 +0,0 @@
scala_library(
compiler_option_sets = ["fatal_warnings"],
platform = "java8",
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/com/google/inject:guice",
"3rdparty/jvm/com/google/inject/extensions:guice-assistedinject",
"3rdparty/jvm/net/codingwell:scala-guice",
"3rdparty/jvm/org/slf4j:slf4j-api",
"finatra/inject/inject-core/src/main/scala",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/base",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/clients/strato",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/constants",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/models",
"strato/config/columns/ml/featureStore:featureStore-strato-client",
"strato/config/columns/onboarding/userrecs:userrecs-strato-client",
"strato/src/main/scala/com/twitter/strato/client",
"util/util-slf4j-api/src/main/scala",
],
)

View File

@ -1,14 +0,0 @@
package com.twitter.follow_recommendations.common.clients.real_time_real_graph
sealed trait EngagementType
// We do not include SoftFollow since it's deprecated
object EngagementType {
object Click extends EngagementType
object Like extends EngagementType
object Mention extends EngagementType
object Retweet extends EngagementType
object ProfileView extends EngagementType
}
case class Engagement(engagementType: EngagementType, timestamp: Long)

View File

@ -1,58 +0,0 @@
package com.twitter.follow_recommendations.common.clients.real_time_real_graph
import com.twitter.conversions.DurationOps._
import com.twitter.util.Time
object EngagementScorer {
private[real_time_real_graph] val MemoryDecayHalfLife = 24.hour
private val ScoringFunctionBase = 0.5
def apply(
engagements: Map[Long, Seq[Engagement]],
engagementScoreMap: Map[EngagementType, Double],
minScore: Double = 0.0
): Seq[(Long, Double, Seq[EngagementType])] = {
val now = Time.now
engagements
.mapValues { engags =>
val totalScore = engags.map { engagement => score(engagement, now, engagementScoreMap) }.sum
val engagementProof = getEngagementProof(engags, engagementScoreMap)
(totalScore, engagementProof)
}
.collect { case (uid, (score, proof)) if score > minScore => (uid, score, proof) }
.toSeq
.sortBy(-_._2)
}
/**
* The engagement score is the base score decayed via timestamp, loosely model the human memory forgetting
* curve, see https://en.wikipedia.org/wiki/Forgetting_curve
*/
private[real_time_real_graph] def score(
engagement: Engagement,
now: Time,
engagementScoreMap: Map[EngagementType, Double]
): Double = {
val timeLapse = math.max(now.inMillis - engagement.timestamp, 0)
val engagementScore = engagementScoreMap.getOrElse(engagement.engagementType, 0.0)
engagementScore * math.pow(
ScoringFunctionBase,
timeLapse.toDouble / MemoryDecayHalfLife.inMillis)
}
private def getEngagementProof(
engagements: Seq[Engagement],
engagementScoreMap: Map[EngagementType, Double]
): Seq[EngagementType] = {
val filteredEngagement = engagements
.collectFirst {
case engagement
if engagement.engagementType != EngagementType.Click
&& engagementScoreMap.get(engagement.engagementType).exists(_ > 0.0) =>
engagement.engagementType
}
Seq(filteredEngagement.getOrElse(EngagementType.Click))
}
}

View File

@ -1,128 +0,0 @@
package com.twitter.follow_recommendations.common.clients.real_time_real_graph
import com.google.inject.Inject
import com.google.inject.Singleton
import com.twitter.conversions.DurationOps._
import com.twitter.follow_recommendations.common.models.CandidateUser
import com.twitter.snowflake.id.SnowflakeId
import com.twitter.stitch.Stitch
import com.twitter.strato.generated.client.ml.featureStore.TimelinesUserVertexOnUserClientColumn
import com.twitter.strato.generated.client.onboarding.userrecs.RealGraphScoresMhOnUserClientColumn
import com.twitter.util.Duration
import com.twitter.util.Time
import com.twitter.wtf.real_time_interaction_graph.thriftscala._
@Singleton
class RealTimeRealGraphClient @Inject() (
timelinesUserVertexOnUserClientColumn: TimelinesUserVertexOnUserClientColumn,
realGraphScoresMhOnUserClientColumn: RealGraphScoresMhOnUserClientColumn) {
def mapUserVertexToEngagementAndFilter(userVertex: UserVertex): Map[Long, Seq[Engagement]] = {
val minTimestamp = (Time.now - RealTimeRealGraphClient.MaxEngagementAge).inMillis
userVertex.outgoingInteractionMap.mapValues { interactions =>
interactions
.flatMap { interaction => RealTimeRealGraphClient.toEngagement(interaction) }.filter(
_.timestamp >= minTimestamp)
}.toMap
}
def getRecentProfileViewEngagements(userId: Long): Stitch[Map[Long, Seq[Engagement]]] = {
timelinesUserVertexOnUserClientColumn.fetcher
.fetch(userId).map(_.v).map { input =>
input
.map { userVertex =>
val targetToEngagements = mapUserVertexToEngagementAndFilter(userVertex)
targetToEngagements.mapValues { engagements =>
engagements.filter(engagement =>
engagement.engagementType == EngagementType.ProfileView)
}
}.getOrElse(Map.empty)
}
}
def getUsersRecentlyEngagedWith(
userId: Long,
engagementScoreMap: Map[EngagementType, Double],
includeDirectFollowCandidates: Boolean,
includeNonDirectFollowCandidates: Boolean
): Stitch[Seq[CandidateUser]] = {
val isNewUser =
SnowflakeId.timeFromIdOpt(userId).exists { signupTime =>
(Time.now - signupTime) < RealTimeRealGraphClient.MaxNewUserAge
}
val updatedEngagementScoreMap =
if (isNewUser)
engagementScoreMap + (EngagementType.ProfileView -> RealTimeRealGraphClient.ProfileViewScore)
else engagementScoreMap
Stitch
.join(
timelinesUserVertexOnUserClientColumn.fetcher.fetch(userId).map(_.v),
realGraphScoresMhOnUserClientColumn.fetcher.fetch(userId).map(_.v)).map {
case (Some(userVertex), Some(neighbors)) =>
val engagements = mapUserVertexToEngagementAndFilter(userVertex)
val candidatesAndScores: Seq[(Long, Double, Seq[EngagementType])] =
EngagementScorer.apply(engagements, engagementScoreMap = updatedEngagementScoreMap)
val directNeighbors = neighbors.candidates.map(_._1).toSet
val (directFollows, nonDirectFollows) = candidatesAndScores
.partition {
case (id, _, _) => directNeighbors.contains(id)
}
val candidates =
(if (includeNonDirectFollowCandidates) nonDirectFollows else Seq.empty) ++
(if (includeDirectFollowCandidates)
directFollows.take(RealTimeRealGraphClient.MaxNumDirectFollow)
else Seq.empty)
candidates.map {
case (id, score, proof) =>
CandidateUser(id, Some(score))
}
case _ => Nil
}
}
def getRealGraphWeights(userId: Long): Stitch[Map[Long, Double]] =
realGraphScoresMhOnUserClientColumn.fetcher
.fetch(userId)
.map(
_.v
.map(_.candidates.map(candidate => (candidate.userId, candidate.score)).toMap)
.getOrElse(Map.empty[Long, Double]))
}
object RealTimeRealGraphClient {
private def toEngagement(interaction: Interaction): Option[Engagement] = {
// We do not include SoftFollow since it's deprecated
interaction match {
case Interaction.Retweet(Retweet(timestamp)) =>
Some(Engagement(EngagementType.Retweet, timestamp))
case Interaction.Favorite(Favorite(timestamp)) =>
Some(Engagement(EngagementType.Like, timestamp))
case Interaction.Click(Click(timestamp)) => Some(Engagement(EngagementType.Click, timestamp))
case Interaction.Mention(Mention(timestamp)) =>
Some(Engagement(EngagementType.Mention, timestamp))
case Interaction.ProfileView(ProfileView(timestamp)) =>
Some(Engagement(EngagementType.ProfileView, timestamp))
case _ => None
}
}
val MaxNumDirectFollow = 50
val MaxEngagementAge: Duration = 14.days
val MaxNewUserAge: Duration = 30.days
val ProfileViewScore = 0.4
val EngagementScoreMap = Map(
EngagementType.Like -> 1.0,
EngagementType.Retweet -> 1.0,
EngagementType.Mention -> 1.0
)
val StrongEngagementScoreMap = Map(
EngagementType.Like -> 1.0,
EngagementType.Retweet -> 1.0,
)
}

View File

@ -1,26 +0,0 @@
scala_library(
sources = ["*.scala"],
compiler_option_sets = ["fatal_warnings"],
platform = "java8",
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/com/github/nscala_time",
"3rdparty/jvm/com/google/inject:guice",
"3rdparty/jvm/com/google/inject/extensions:guice-assistedinject",
"3rdparty/jvm/net/codingwell:scala-guice",
"3rdparty/jvm/org/slf4j:slf4j-api",
"escherbird/src/scala/com/twitter/escherbird/util/stitchcache",
"finatra-internal/mtls-thriftmux/src/main/scala",
"finatra/inject/inject-core/src/main/scala",
"finatra/inject/inject-thrift-client/src/main/scala",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/base",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/clients/common",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/models",
"socialgraph/server/src/main/scala/com/twitter/socialgraph/util",
"src/thrift/com/twitter/socialgraph:thrift-scala",
"stitch/stitch-socialgraph",
"strato/config/columns/onboarding/socialGraphService:socialGraphService-strato-client",
"strato/src/main/scala/com/twitter/strato/client",
"util/util-core:scala",
],
)

View File

@ -1,421 +0,0 @@
package com.twitter.follow_recommendations.common.clients.socialgraph
import com.twitter.escherbird.util.stitchcache.StitchCache
import com.twitter.finagle.stats.NullStatsReceiver
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.follow_recommendations.common.base.StatsUtil
import com.twitter.follow_recommendations.common.models.FollowProof
import com.twitter.follow_recommendations.common.models.UserIdWithTimestamp
import com.twitter.inject.Logging
import com.twitter.socialgraph.thriftscala.EdgesRequest
import com.twitter.socialgraph.thriftscala.IdsRequest
import com.twitter.socialgraph.thriftscala.IdsResult
import com.twitter.socialgraph.thriftscala.LookupContext
import com.twitter.socialgraph.thriftscala.OverCapacity
import com.twitter.socialgraph.thriftscala.PageRequest
import com.twitter.socialgraph.thriftscala.RelationshipType
import com.twitter.socialgraph.thriftscala.SrcRelationship
import com.twitter.socialgraph.util.ByteBufferUtil
import com.twitter.stitch.Stitch
import com.twitter.stitch.socialgraph.SocialGraph
import com.twitter.strato.client.Fetcher
import com.twitter.strato.generated.client.onboarding.socialGraphService.IdsClientColumn
import com.twitter.util.Duration
import com.twitter.util.Time
import java.nio.ByteBuffer
import javax.inject.Inject
import javax.inject.Singleton
case class RecentEdgesQuery(
userId: Long,
relations: Seq[RelationshipType],
// prefer to default value to better utilize the caching function of stitch
count: Option[Int] = Some(SocialGraphClient.MaxQuerySize),
performUnion: Boolean = true,
recentEdgesWindowOpt: Option[Duration] = None,
targets: Option[Seq[Long]] = None)
case class EdgeRequestQuery(
userId: Long,
relation: RelationshipType,
count: Option[Int] = Some(SocialGraphClient.MaxQuerySize),
performUnion: Boolean = true,
recentEdgesWindowOpt: Option[Duration] = None,
targets: Option[Seq[Long]] = None)
@Singleton
class SocialGraphClient @Inject() (
socialGraph: SocialGraph,
idsClientColumn: IdsClientColumn,
statsReceiver: StatsReceiver = NullStatsReceiver)
extends Logging {
private val stats = statsReceiver.scope(this.getClass.getSimpleName)
private val cacheStats = stats.scope("cache")
private val getIntersectionsStats = stats.scope("getIntersections")
private val getIntersectionsFromCachedColumnStats =
stats.scope("getIntersectionsFromCachedColumn")
private val getRecentEdgesStats = stats.scope("getRecentEdges")
private val getRecentEdgesCachedStats = stats.scope("getRecentEdgesCached")
private val getRecentEdgesFromCachedColumnStats = stats.scope("getRecentEdgesFromCachedColumn")
private val getRecentEdgesCachedInternalStats = stats.scope("getRecentEdgesCachedInternal")
private val getRecentEdgesWithTimeStats = stats.scope("getRecentEdgesWithTime")
val sgsIdsFetcher: Fetcher[IdsRequest, Unit, IdsResult] = idsClientColumn.fetcher
private val recentEdgesCache = StitchCache[RecentEdgesQuery, Seq[Long]](
maxCacheSize = SocialGraphClient.MaxCacheSize,
ttl = SocialGraphClient.CacheTTL,
statsReceiver = cacheStats,
underlyingCall = getRecentEdges
)
def getRecentEdgesCached(
rq: RecentEdgesQuery,
useCachedStratoColumn: Boolean = true
): Stitch[Seq[Long]] = {
getRecentEdgesCachedStats.counter("requests").incr()
if (useCachedStratoColumn) {
getRecentEdgesFromCachedColumn(rq)
} else {
StatsUtil.profileStitch(
getRecentEdgesCachedInternal(rq),
getRecentEdgesCachedInternalStats
)
}
}
def getRecentEdgesCachedInternal(rq: RecentEdgesQuery): Stitch[Seq[Long]] = {
recentEdgesCache.readThrough(rq)
}
def getRecentEdgesFromCachedColumn(rq: RecentEdgesQuery): Stitch[Seq[Long]] = {
val pageRequest = rq.recentEdgesWindowOpt match {
case Some(recentEdgesWindow) =>
PageRequest(
count = rq.count,
cursor = Some(getEdgeCursor(recentEdgesWindow)),
selectAll = Some(true)
)
case _ => PageRequest(count = rq.count)
}
val idsRequest = IdsRequest(
rq.relations.map { relationshipType =>
SrcRelationship(
source = rq.userId,
relationshipType = relationshipType,
targets = rq.targets
)
},
pageRequest = Some(pageRequest),
context = Some(LookupContext(performUnion = Some(rq.performUnion)))
)
val socialGraphStitch = sgsIdsFetcher
.fetch(idsRequest, Unit)
.map(_.v)
.map { result =>
result
.map { idResult =>
val userIds: Seq[Long] = idResult.ids
getRecentEdgesFromCachedColumnStats.stat("num_edges").add(userIds.size)
userIds
}.getOrElse(Seq.empty)
}
.rescue {
case e: Exception =>
stats.counter(e.getClass.getSimpleName).incr()
Stitch.Nil
}
StatsUtil.profileStitch(
socialGraphStitch,
getRecentEdgesFromCachedColumnStats
)
}
def getRecentEdges(rq: RecentEdgesQuery): Stitch[Seq[Long]] = {
val pageRequest = rq.recentEdgesWindowOpt match {
case Some(recentEdgesWindow) =>
PageRequest(
count = rq.count,
cursor = Some(getEdgeCursor(recentEdgesWindow)),
selectAll = Some(true)
)
case _ => PageRequest(count = rq.count)
}
val socialGraphStitch = socialGraph
.ids(
IdsRequest(
rq.relations.map { relationshipType =>
SrcRelationship(
source = rq.userId,
relationshipType = relationshipType,
targets = rq.targets
)
},
pageRequest = Some(pageRequest),
context = Some(LookupContext(performUnion = Some(rq.performUnion)))
)
)
.map { idsResult =>
val userIds: Seq[Long] = idsResult.ids
getRecentEdgesStats.stat("num_edges").add(userIds.size)
userIds
}
.rescue {
case e: OverCapacity =>
stats.counter(e.getClass.getSimpleName).incr()
logger.warn("SGS Over Capacity", e)
Stitch.Nil
}
StatsUtil.profileStitch(
socialGraphStitch,
getRecentEdgesStats
)
}
// This method return recent edges of (userId, timeInMs)
def getRecentEdgesWithTime(rq: EdgeRequestQuery): Stitch[Seq[UserIdWithTimestamp]] = {
val pageRequest = rq.recentEdgesWindowOpt match {
case Some(recentEdgesWindow) =>
PageRequest(
count = rq.count,
cursor = Some(getEdgeCursor(recentEdgesWindow)),
selectAll = Some(true)
)
case _ => PageRequest(count = rq.count)
}
val socialGraphStitch = socialGraph
.edges(
EdgesRequest(
SrcRelationship(
source = rq.userId,
relationshipType = rq.relation,
targets = rq.targets
),
pageRequest = Some(pageRequest),
context = Some(LookupContext(performUnion = Some(rq.performUnion)))
)
)
.map { edgesResult =>
val userIds = edgesResult.edges.map { socialEdge =>
UserIdWithTimestamp(socialEdge.target, socialEdge.updatedAt)
}
getRecentEdgesWithTimeStats.stat("num_edges").add(userIds.size)
userIds
}
.rescue {
case e: OverCapacity =>
stats.counter(e.getClass.getSimpleName).incr()
logger.warn("SGS Over Capacity", e)
Stitch.Nil
}
StatsUtil.profileStitch(
socialGraphStitch,
getRecentEdgesWithTimeStats
)
}
// This method returns the cursor for a time duration, such that all the edges returned by SGS will be created
// in the range (now-window, now)
def getEdgeCursor(window: Duration): ByteBuffer = {
val cursorInLong = (-(Time.now - window).inMilliseconds) << 20
ByteBufferUtil.fromLong(cursorInLong)
}
// notice that this is more expensive but more realtime than the GFS one
def getIntersections(
userId: Long,
candidateIds: Seq[Long],
numIntersectionIds: Int
): Stitch[Map[Long, FollowProof]] = {
val socialGraphStitch: Stitch[Map[Long, FollowProof]] = Stitch
.collect(candidateIds.map { candidateId =>
socialGraph
.ids(
IdsRequest(
Seq(
SrcRelationship(userId, RelationshipType.Following),
SrcRelationship(candidateId, RelationshipType.FollowedBy)
),
pageRequest = Some(PageRequest(count = Some(numIntersectionIds)))
)
).map { idsResult =>
getIntersectionsStats.stat("num_edges").add(idsResult.ids.size)
(candidateId -> FollowProof(idsResult.ids, idsResult.ids.size))
}
}).map(_.toMap)
.rescue {
case e: OverCapacity =>
stats.counter(e.getClass.getSimpleName).incr()
logger.warn("social graph over capacity in hydrating social proof", e)
Stitch.value(Map.empty)
}
StatsUtil.profileStitch(
socialGraphStitch,
getIntersectionsStats
)
}
def getIntersectionsFromCachedColumn(
userId: Long,
candidateIds: Seq[Long],
numIntersectionIds: Int
): Stitch[Map[Long, FollowProof]] = {
val socialGraphStitch: Stitch[Map[Long, FollowProof]] = Stitch
.collect(candidateIds.map { candidateId =>
val idsRequest = IdsRequest(
Seq(
SrcRelationship(userId, RelationshipType.Following),
SrcRelationship(candidateId, RelationshipType.FollowedBy)
),
pageRequest = Some(PageRequest(count = Some(numIntersectionIds)))
)
sgsIdsFetcher
.fetch(idsRequest, Unit)
.map(_.v)
.map { resultOpt =>
resultOpt.map { idsResult =>
getIntersectionsFromCachedColumnStats.stat("num_edges").add(idsResult.ids.size)
candidateId -> FollowProof(idsResult.ids, idsResult.ids.size)
}
}
}).map(_.flatten.toMap)
.rescue {
case e: Exception =>
stats.counter(e.getClass.getSimpleName).incr()
Stitch.value(Map.empty)
}
StatsUtil.profileStitch(
socialGraphStitch,
getIntersectionsFromCachedColumnStats
)
}
def getInvalidRelationshipUserIds(
userId: Long,
maxNumRelationship: Int = SocialGraphClient.MaxNumInvalidRelationship
): Stitch[Seq[Long]] = {
getRecentEdges(
RecentEdgesQuery(
userId,
SocialGraphClient.InvalidRelationshipTypes,
Some(maxNumRelationship)
)
)
}
def getInvalidRelationshipUserIdsFromCachedColumn(
userId: Long,
maxNumRelationship: Int = SocialGraphClient.MaxNumInvalidRelationship
): Stitch[Seq[Long]] = {
getRecentEdgesFromCachedColumn(
RecentEdgesQuery(
userId,
SocialGraphClient.InvalidRelationshipTypes,
Some(maxNumRelationship)
)
)
}
def getRecentFollowedUserIds(userId: Long): Stitch[Seq[Long]] = {
getRecentEdges(
RecentEdgesQuery(
userId,
Seq(RelationshipType.Following)
)
)
}
def getRecentFollowedUserIdsFromCachedColumn(userId: Long): Stitch[Seq[Long]] = {
getRecentEdgesFromCachedColumn(
RecentEdgesQuery(
userId,
Seq(RelationshipType.Following)
)
)
}
def getRecentFollowedUserIdsWithTime(userId: Long): Stitch[Seq[UserIdWithTimestamp]] = {
getRecentEdgesWithTime(
EdgeRequestQuery(
userId,
RelationshipType.Following
)
)
}
def getRecentFollowedByUserIds(userId: Long): Stitch[Seq[Long]] = {
getRecentEdges(
RecentEdgesQuery(
userId,
Seq(RelationshipType.FollowedBy)
)
)
}
def getRecentFollowedByUserIdsFromCachedColumn(userId: Long): Stitch[Seq[Long]] = {
getRecentEdgesFromCachedColumn(
RecentEdgesQuery(
userId,
Seq(RelationshipType.FollowedBy)
)
)
}
def getRecentFollowedUserIdsWithTimeWindow(
userId: Long,
timeWindow: Duration
): Stitch[Seq[Long]] = {
getRecentEdges(
RecentEdgesQuery(
userId,
Seq(RelationshipType.Following),
recentEdgesWindowOpt = Some(timeWindow)
)
)
}
}
object SocialGraphClient {
val MaxQuerySize: Int = 500
val MaxCacheSize: Int = 5000000
// Ref: src/thrift/com/twitter/socialgraph/social_graph_service.thrift
val MaxNumInvalidRelationship: Int = 5000
val CacheTTL: Duration = Duration.fromHours(24)
val InvalidRelationshipTypes: Seq[RelationshipType] = Seq(
RelationshipType.HideRecommendations,
RelationshipType.Blocking,
RelationshipType.BlockedBy,
RelationshipType.Muting,
RelationshipType.MutedBy,
RelationshipType.ReportedAsSpam,
RelationshipType.ReportedAsSpamBy,
RelationshipType.ReportedAsAbuse,
RelationshipType.ReportedAsAbuseBy,
RelationshipType.FollowRequestOutgoing,
RelationshipType.Following,
RelationshipType.UsedToFollow,
)
/**
*
* Whether to call SGS to validate each candidate based on the number of invalid relationship users
* prefetched during request building step. This aims to not omit any invalid candidates that are
* not filtered out in previous steps.
* If the number is 0, this might be a fail-opened SGS call.
* If the number is larger or equal to 5000, this could hit SGS page size limit.
* Both cases account for a small percentage of the total traffic (<5%).
*
* @param numInvalidRelationshipUsers number of invalid relationship users fetched from getInvalidRelationshipUserIds
* @return whether to enable post-ranker SGS predicate
*/
def enablePostRankerSgsPredicate(numInvalidRelationshipUsers: Int): Boolean = {
numInvalidRelationshipUsers == 0 || numInvalidRelationshipUsers >= MaxNumInvalidRelationship
}
}

View File

@ -1,25 +0,0 @@
package com.twitter.follow_recommendations.common.clients.socialgraph
import com.google.inject.Provides
import com.twitter.finagle.ThriftMux
import com.twitter.finatra.mtls.thriftmux.modules.MtlsClient
import com.twitter.follow_recommendations.common.clients.common.BaseClientModule
import com.twitter.socialgraph.thriftscala.SocialGraphService
import com.twitter.stitch.socialgraph.SocialGraph
import javax.inject.Singleton
object SocialGraphModule
extends BaseClientModule[SocialGraphService.MethodPerEndpoint]
with MtlsClient {
override val label = "social-graph-service"
override val dest = "/s/socialgraph/socialgraph"
override def configureThriftMuxClient(client: ThriftMux.Client): ThriftMux.Client =
client.withSessionQualifier.noFailFast
@Provides
@Singleton
def providesStitchClient(futureIface: SocialGraphService.MethodPerEndpoint): SocialGraph = {
SocialGraph(futureIface)
}
}

View File

@ -1,30 +0,0 @@
scala_library(
compiler_option_sets = ["fatal_warnings"],
platform = "java8",
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/com/google/inject:guice",
"3rdparty/jvm/com/google/inject/extensions:guice-assistedinject",
"3rdparty/jvm/net/codingwell:scala-guice",
"3rdparty/jvm/org/slf4j:slf4j-api",
"finatra/inject/inject-core/src/main/scala",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/base",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/constants",
"src/scala/com/twitter/onboarding/relevance/candidate_generation/utt/models",
"src/thrift/com/twitter/core_workflows/user_model:user_model-scala",
"src/thrift/com/twitter/frigate/data_pipeline:frigate-user-history-thrift-scala",
"src/thrift/com/twitter/hermit/candidate:hermit-candidate-scala",
"src/thrift/com/twitter/hermit/pop_geo:hermit-pop-geo-scala",
"src/thrift/com/twitter/onboarding/relevance/relatable_accounts:relatable_accounts-scala",
"src/thrift/com/twitter/onboarding/relevance/store:store-scala",
"src/thrift/com/twitter/recos/user_user_graph:user_user_graph-scala",
"src/thrift/com/twitter/search/account_search/extended_network:extended_network_users-scala",
"src/thrift/com/twitter/service/metastore/gen:thrift-scala",
"src/thrift/com/twitter/wtf/candidate:wtf-candidate-scala",
"src/thrift/com/twitter/wtf/ml:wtf-ml-output-thrift-scala",
"src/thrift/com/twitter/wtf/real_time_interaction_graph:wtf-real_time_interaction_graph-thrift-scala",
"src/thrift/com/twitter/wtf/triangular_loop:triangular_loop-scala",
"strato/src/main/scala/com/twitter/strato/client",
"util/util-slf4j-api/src/main/scala",
],
)

View File

@ -1,249 +0,0 @@
package com.twitter.follow_recommendations.common.clients.strato
import com.google.inject.name.Named
import com.google.inject.Provides
import com.google.inject.Singleton
import com.twitter.conversions.DurationOps._
import com.twitter.core_workflows.user_model.thriftscala.CondensedUserState
import com.twitter.search.account_search.extended_network.thriftscala.ExtendedNetworkUserKey
import com.twitter.search.account_search.extended_network.thriftscala.ExtendedNetworkUserVal
import com.twitter.finagle.ThriftMux
import com.twitter.finagle.mtls.authentication.ServiceIdentifier
import com.twitter.finagle.thrift.Protocols
import com.twitter.follow_recommendations.common.constants.GuiceNamedConstants
import com.twitter.follow_recommendations.common.constants.ServiceConstants._
import com.twitter.frigate.data_pipeline.candidate_generation.thriftscala.LatestEvents
import com.twitter.hermit.candidate.thriftscala.{Candidates => HermitCandidates}
import com.twitter.hermit.pop_geo.thriftscala.PopUsersInPlace
import com.twitter.onboarding.relevance.relatable_accounts.thriftscala.RelatableAccounts
import com.twitter.inject.TwitterModule
import com.twitter.onboarding.relevance.candidates.thriftscala.InterestBasedUserRecommendations
import com.twitter.onboarding.relevance.candidates.thriftscala.UTTInterest
import com.twitter.onboarding.relevance.store.thriftscala.WhoToFollowDismissEventDetails
import com.twitter.recos.user_user_graph.thriftscala.RecommendUserRequest
import com.twitter.recos.user_user_graph.thriftscala.RecommendUserResponse
import com.twitter.service.metastore.gen.thriftscala.UserRecommendabilityFeatures
import com.twitter.strato.catalog.Scan.Slice
import com.twitter.strato.client.Strato.{Client => StratoClient}
import com.twitter.strato.client.Client
import com.twitter.strato.client.Fetcher
import com.twitter.strato.client.Scanner
import com.twitter.strato.thrift.ScroogeConvImplicits._
import com.twitter.wtf.candidate.thriftscala.CandidateSeq
import com.twitter.wtf.ml.thriftscala.CandidateFeatures
import com.twitter.wtf.real_time_interaction_graph.thriftscala.Interaction
import com.twitter.wtf.triangular_loop.thriftscala.{Candidates => TriangularLoopCandidates}
import com.twitter.strato.opcontext.Attribution._
object StratoClientModule extends TwitterModule {
// column paths
val CosineFollowPath = "recommendations/similarity/similarUsersByFollowGraph.User"
val CosineListPath = "recommendations/similarity/similarUsersByListGraph.User"
val CuratedCandidatesPath = "onboarding/curatedAccounts"
val CuratedFilteredAccountsPath = "onboarding/filteredAccountsFromRecommendations"
val PopUsersInPlacePath = "onboarding/userrecs/popUsersInPlace"
val ProfileSidebarBlacklistPath = "recommendations/hermit/profile-sidebar-blacklist"
val RealTimeInteractionsPath = "hmli/realTimeInteractions"
val SimsPath = "recommendations/similarity/similarUsersBySims.User"
val DBV2SimsPath = "onboarding/userrecs/newSims.User"
val TriangularLoopsPath = "onboarding/userrecs/triangularLoops.User"
val TwoHopRandomWalkPath = "onboarding/userrecs/twoHopRandomWalk.User"
val UserRecommendabilityPath = "onboarding/userRecommendabilityWithLongKeys.User"
val UTTAccountRecommendationsPath = "onboarding/userrecs/utt_account_recommendations"
val UttSeedAccountsRecommendationPath = "onboarding/userrecs/utt_seed_accounts"
val UserStatePath = "onboarding/userState.User"
val WTFPostNuxFeaturesPath = "ml/featureStore/onboarding/wtfPostNuxFeatures.User"
val ElectionCandidatesPath = "onboarding/electionAccounts"
val UserUserGraphPath = "recommendations/userUserGraph"
val WtfDissmissEventsPath = "onboarding/wtfDismissEvents"
val RelatableAccountsPath = "onboarding/userrecs/relatableAccounts"
val ExtendedNetworkCandidatesPath = "search/account_search/extendedNetworkCandidatesMH"
val LabeledNotificationPath = "frigate/magicrecs/labeledPushRecsAggregated.User"
@Provides
@Singleton
def stratoClient(serviceIdentifier: ServiceIdentifier): Client = {
val timeoutBudget = 500.milliseconds
StratoClient(
ThriftMux.client
.withRequestTimeout(timeoutBudget)
.withProtocolFactory(Protocols.binaryFactory(
stringLengthLimit = StringLengthLimit,
containerLengthLimit = ContainerLengthLimit)))
.withMutualTls(serviceIdentifier)
.build()
}
// add strato putters, fetchers, scanners below:
@Provides
@Singleton
@Named(GuiceNamedConstants.COSINE_FOLLOW_FETCHER)
def cosineFollowFetcher(stratoClient: Client): Fetcher[Long, Unit, HermitCandidates] =
stratoClient.fetcher[Long, Unit, HermitCandidates](CosineFollowPath)
@Provides
@Singleton
@Named(GuiceNamedConstants.COSINE_LIST_FETCHER)
def cosineListFetcher(stratoClient: Client): Fetcher[Long, Unit, HermitCandidates] =
stratoClient.fetcher[Long, Unit, HermitCandidates](CosineListPath)
@Provides
@Singleton
@Named(GuiceNamedConstants.CURATED_COMPETITOR_ACCOUNTS_FETCHER)
def curatedBlacklistedAccountsFetcher(stratoClient: Client): Fetcher[String, Unit, Seq[Long]] =
stratoClient.fetcher[String, Unit, Seq[Long]](CuratedFilteredAccountsPath)
@Provides
@Singleton
@Named(GuiceNamedConstants.CURATED_CANDIDATES_FETCHER)
def curatedCandidatesFetcher(stratoClient: Client): Fetcher[String, Unit, Seq[Long]] =
stratoClient.fetcher[String, Unit, Seq[Long]](CuratedCandidatesPath)
@Provides
@Singleton
@Named(GuiceNamedConstants.POP_USERS_IN_PLACE_FETCHER)
def popUsersInPlaceFetcher(stratoClient: Client): Fetcher[String, Unit, PopUsersInPlace] =
stratoClient.fetcher[String, Unit, PopUsersInPlace](PopUsersInPlacePath)
@Provides
@Singleton
@Named(GuiceNamedConstants.RELATABLE_ACCOUNTS_FETCHER)
def relatableAccountsFetcher(stratoClient: Client): Fetcher[String, Unit, RelatableAccounts] =
stratoClient.fetcher[String, Unit, RelatableAccounts](RelatableAccountsPath)
@Provides
@Singleton
@Named(GuiceNamedConstants.PROFILE_SIDEBAR_BLACKLIST_SCANNER)
def profileSidebarBlacklistScanner(
stratoClient: Client
): Scanner[(Long, Slice[Long]), Unit, (Long, Long), Unit] =
stratoClient.scanner[(Long, Slice[Long]), Unit, (Long, Long), Unit](ProfileSidebarBlacklistPath)
@Provides
@Singleton
@Named(GuiceNamedConstants.REAL_TIME_INTERACTIONS_FETCHER)
def realTimeInteractionsFetcher(
stratoClient: Client
): Fetcher[(Long, Long), Unit, Seq[Interaction]] =
stratoClient.fetcher[(Long, Long), Unit, Seq[Interaction]](RealTimeInteractionsPath)
@Provides
@Singleton
@Named(GuiceNamedConstants.SIMS_FETCHER)
def simsFetcher(stratoClient: Client): Fetcher[Long, Unit, HermitCandidates] =
stratoClient.fetcher[Long, Unit, HermitCandidates](SimsPath)
@Provides
@Singleton
@Named(GuiceNamedConstants.DBV2_SIMS_FETCHER)
def dbv2SimsFetcher(stratoClient: Client): Fetcher[Long, Unit, HermitCandidates] =
stratoClient.fetcher[Long, Unit, HermitCandidates](DBV2SimsPath)
@Provides
@Singleton
@Named(GuiceNamedConstants.TRIANGULAR_LOOPS_FETCHER)
def triangularLoopsFetcher(stratoClient: Client): Fetcher[Long, Unit, TriangularLoopCandidates] =
stratoClient.fetcher[Long, Unit, TriangularLoopCandidates](TriangularLoopsPath)
@Provides
@Singleton
@Named(GuiceNamedConstants.TWO_HOP_RANDOM_WALK_FETCHER)
def twoHopRandomWalkFetcher(stratoClient: Client): Fetcher[Long, Unit, CandidateSeq] =
stratoClient.fetcher[Long, Unit, CandidateSeq](TwoHopRandomWalkPath)
@Provides
@Singleton
@Named(GuiceNamedConstants.USER_RECOMMENDABILITY_FETCHER)
def userRecommendabilityFetcher(
stratoClient: Client
): Fetcher[Long, Unit, UserRecommendabilityFeatures] =
stratoClient.fetcher[Long, Unit, UserRecommendabilityFeatures](UserRecommendabilityPath)
@Provides
@Singleton
@Named(GuiceNamedConstants.USER_STATE_FETCHER)
def userStateFetcher(stratoClient: Client): Fetcher[Long, Unit, CondensedUserState] =
stratoClient.fetcher[Long, Unit, CondensedUserState](UserStatePath)
@Provides
@Singleton
@Named(GuiceNamedConstants.UTT_ACCOUNT_RECOMMENDATIONS_FETCHER)
def uttAccountRecommendationsFetcher(
stratoClient: Client
): Fetcher[UTTInterest, Unit, InterestBasedUserRecommendations] =
stratoClient.fetcher[UTTInterest, Unit, InterestBasedUserRecommendations](
UTTAccountRecommendationsPath)
@Provides
@Singleton
@Named(GuiceNamedConstants.UTT_SEED_ACCOUNTS_FETCHER)
def uttSeedAccountRecommendationsFetcher(
stratoClient: Client
): Fetcher[UTTInterest, Unit, InterestBasedUserRecommendations] =
stratoClient.fetcher[UTTInterest, Unit, InterestBasedUserRecommendations](
UttSeedAccountsRecommendationPath)
@Provides
@Singleton
@Named(GuiceNamedConstants.ELECTION_CANDIDATES_FETCHER)
def electionCandidatesFetcher(stratoClient: Client): Fetcher[String, Unit, Seq[Long]] =
stratoClient.fetcher[String, Unit, Seq[Long]](ElectionCandidatesPath)
@Provides
@Singleton
@Named(GuiceNamedConstants.USER_USER_GRAPH_FETCHER)
def userUserGraphFetcher(
stratoClient: Client
): Fetcher[RecommendUserRequest, Unit, RecommendUserResponse] =
stratoClient.fetcher[RecommendUserRequest, Unit, RecommendUserResponse](UserUserGraphPath)
@Provides
@Singleton
@Named(GuiceNamedConstants.POST_NUX_WTF_FEATURES_FETCHER)
def wtfPostNuxFeaturesFetcher(stratoClient: Client): Fetcher[Long, Unit, CandidateFeatures] = {
val attribution = ManhattanAppId("starbuck", "wtf_starbuck")
stratoClient
.fetcher[Long, Unit, CandidateFeatures](WTFPostNuxFeaturesPath)
.withAttribution(attribution)
}
@Provides
@Singleton
@Named(GuiceNamedConstants.EXTENDED_NETWORK)
def extendedNetworkFetcher(
stratoClient: Client
): Fetcher[ExtendedNetworkUserKey, Unit, ExtendedNetworkUserVal] = {
stratoClient
.fetcher[ExtendedNetworkUserKey, Unit, ExtendedNetworkUserVal](ExtendedNetworkCandidatesPath)
}
@Provides
@Singleton
@Named(GuiceNamedConstants.DISMISS_STORE_SCANNER)
def dismissStoreScanner(
stratoClient: Client
): Scanner[
(Long, Slice[(Long, Long)]),
Unit,
(Long, (Long, Long)),
WhoToFollowDismissEventDetails
] =
stratoClient.scanner[
(Long, Slice[(Long, Long)]), // PKEY: userId, LKEY: (-ts, candidateId)
Unit,
(Long, (Long, Long)),
WhoToFollowDismissEventDetails
](WtfDissmissEventsPath)
@Provides
@Singleton
@Named(GuiceNamedConstants.LABELED_NOTIFICATION_FETCHER)
def labeledNotificationFetcher(
stratoClient: Client
): Fetcher[Long, Unit, LatestEvents] = {
stratoClient
.fetcher[Long, Unit, LatestEvents](LabeledNotificationPath)
}
}

View File

@ -1,17 +0,0 @@
scala_library(
sources = ["*.scala"],
compiler_option_sets = ["fatal_warnings"],
platform = "java8",
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/com/google/inject:guice",
"3rdparty/jvm/net/codingwell:scala-guice",
"3rdparty/jvm/org/slf4j:slf4j-api",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/clients/cache",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/clients/strato",
"follow-recommendations-service/server/src/main/scala/com/twitter/follow_recommendations/configapi/deciders",
"stitch/stitch-core",
"strato/src/main/scala/com/twitter/strato/client",
"user-signal-service/thrift/src/main/thrift:thrift-scala",
],
)

View File

@ -1,83 +0,0 @@
package com.twitter.follow_recommendations.common.clients.user_state
import com.google.inject.name.Named
import com.twitter.conversions.DurationOps._
import com.twitter.core_workflows.user_model.thriftscala.CondensedUserState
import com.twitter.core_workflows.user_model.thriftscala.UserState
import com.twitter.decider.Decider
import com.twitter.decider.RandomRecipient
import com.twitter.finagle.Memcached.Client
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.util.DefaultTimer
import com.twitter.follow_recommendations.common.base.StatsUtil
import com.twitter.follow_recommendations.common.clients.cache.MemcacheClient
import com.twitter.follow_recommendations.common.clients.cache.ThriftEnumOptionBijection
import com.twitter.follow_recommendations.common.constants.GuiceNamedConstants
import com.twitter.follow_recommendations.configapi.deciders.DeciderKey
import com.twitter.stitch.Stitch
import com.twitter.strato.client.Fetcher
import com.twitter.util.Duration
import javax.inject.Inject
import javax.inject.Singleton
import java.lang.{Long => JLong}
@Singleton
class UserStateClient @Inject() (
@Named(GuiceNamedConstants.USER_STATE_FETCHER) userStateFetcher: Fetcher[
Long,
Unit,
CondensedUserState
],
client: Client,
statsReceiver: StatsReceiver,
decider: Decider = Decider.False) {
private val stats: StatsReceiver = statsReceiver.scope("user_state_client")
// client to memcache cluster
val bijection = new ThriftEnumOptionBijection[UserState](UserState.apply)
val memcacheClient = MemcacheClient[Option[UserState]](
client = client,
dest = "/s/cache/follow_recos_service:twemcaches",
valueBijection = bijection,
ttl = UserStateClient.CacheTTL,
statsReceiver = stats.scope("twemcache")
)
def getUserState(userId: Long): Stitch[Option[UserState]] = {
val deciderKey: String = DeciderKey.EnableDistributedCaching.toString
val enableDistributedCaching: Boolean = decider.isAvailable(deciderKey, Some(RandomRecipient))
val userStateStitch: Stitch[Option[UserState]] =
enableDistributedCaching match {
// read from memcache
case true => memcacheClient.readThrough(
// add a key prefix to address cache key collisions
key = "UserStateClient" + userId.toString,
underlyingCall = () => fetchUserState(userId)
)
case false => fetchUserState(userId)
}
val userStateStitchWithTimeout: Stitch[Option[UserState]] =
userStateStitch
// set a 150ms timeout limit for user state fetches
.within(150.milliseconds)(DefaultTimer)
.rescue {
case e: Exception =>
stats.scope("rescued").counter(e.getClass.getSimpleName).incr()
Stitch(None)
}
// profile the latency of stitch call and return the result
StatsUtil.profileStitch(
userStateStitchWithTimeout,
stats.scope("getUserState")
)
}
def fetchUserState(userId: JLong): Stitch[Option[UserState]] = {
userStateFetcher.fetch(userId).map(_.v.flatMap(_.userState))
}
}
object UserStateClient {
val CacheTTL: Duration = Duration.fromHours(6)
}

View File

@ -1,9 +0,0 @@
scala_library(
compiler_option_sets = ["fatal_warnings"],
platform = "java8",
tags = ["bazel-compatible"],
dependencies = [
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/models",
"util/util-core/src/main/scala/com/twitter/conversions",
],
)

View File

@ -1,91 +0,0 @@
package com.twitter.follow_recommendations.common.constants
import com.twitter.hermit.constants.AlgorithmFeedbackTokens.AlgorithmToFeedbackTokenMap
import com.twitter.hermit.model.Algorithm._
import com.twitter.follow_recommendations.common.models.AlgorithmType
object CandidateAlgorithmTypeConstants {
/**
* Each algorithm is based on one, or more, of the 4 types of information we have on users,
* described in [[AlgorithmType]]. Assignment of algorithms to these categories are based on
*/
private val AlgorithmIdToType: Map[String, Set[AlgorithmType.Value]] = Map(
// Activity Algorithms:
AlgorithmToFeedbackTokenMap(NewFollowingSimilarUser).toString -> Set(AlgorithmType.Activity),
AlgorithmToFeedbackTokenMap(Sims).toString -> Set(AlgorithmType.Activity),
AlgorithmToFeedbackTokenMap(NewFollowingSimilarUserSalsa).toString -> Set(
AlgorithmType.Activity),
AlgorithmToFeedbackTokenMap(RecentEngagementNonDirectFollow).toString -> Set(
AlgorithmType.Activity),
AlgorithmToFeedbackTokenMap(RecentEngagementSimilarUser).toString -> Set(
AlgorithmType.Activity),
AlgorithmToFeedbackTokenMap(RecentEngagementSarusOcCur).toString -> Set(AlgorithmType.Activity),
AlgorithmToFeedbackTokenMap(RecentSearchBasedRec).toString -> Set(AlgorithmType.Activity),
AlgorithmToFeedbackTokenMap(TwistlyTweetAuthors).toString -> Set(AlgorithmType.Activity),
AlgorithmToFeedbackTokenMap(Follow2VecNearestNeighbors).toString -> Set(AlgorithmType.Activity),
AlgorithmToFeedbackTokenMap(EmailTweetClick).toString -> Set(AlgorithmType.Activity),
AlgorithmToFeedbackTokenMap(RepeatedProfileVisits).toString -> Set(AlgorithmType.Activity),
AlgorithmToFeedbackTokenMap(GoodTweetClickEngagements).toString -> Set(AlgorithmType.Activity),
AlgorithmToFeedbackTokenMap(TweetShareEngagements).toString -> Set(AlgorithmType.Activity),
AlgorithmToFeedbackTokenMap(TweetSharerToShareRecipientEngagements).toString -> Set(
AlgorithmType.Activity),
AlgorithmToFeedbackTokenMap(TweetAuthorToShareRecipientEngagements).toString -> Set(
AlgorithmType.Activity),
AlgorithmToFeedbackTokenMap(LinearRegressionFollow2VecNearestNeighbors).toString -> Set(
AlgorithmType.Activity),
AlgorithmToFeedbackTokenMap(NUXLOHistory).toString -> Set(AlgorithmType.Activity),
AlgorithmToFeedbackTokenMap(TrafficAttributionAccounts).toString -> Set(AlgorithmType.Activity),
AlgorithmToFeedbackTokenMap(RealGraphOonV2).toString -> Set(AlgorithmType.Activity),
AlgorithmToFeedbackTokenMap(MagicRecsRecentEngagements).toString -> Set(AlgorithmType.Activity),
AlgorithmToFeedbackTokenMap(NotificationEngagement).toString -> Set(AlgorithmType.Activity),
// Social Algorithms:
AlgorithmToFeedbackTokenMap(TwoHopRandomWalk).toString -> Set(AlgorithmType.Social),
AlgorithmToFeedbackTokenMap(RealTimeMutualFollow).toString -> Set(AlgorithmType.Social),
AlgorithmToFeedbackTokenMap(ForwardPhoneBook).toString -> Set(AlgorithmType.Social),
AlgorithmToFeedbackTokenMap(ForwardEmailBook).toString -> Set(AlgorithmType.Social),
AlgorithmToFeedbackTokenMap(NewFollowingNewFollowingExpansion).toString -> Set(
AlgorithmType.Social),
AlgorithmToFeedbackTokenMap(NewFollowingSarusCoOccurSocialProof).toString -> Set(
AlgorithmType.Social),
AlgorithmToFeedbackTokenMap(ReverseEmailBookIbis).toString -> Set(AlgorithmType.Social),
AlgorithmToFeedbackTokenMap(ReversePhoneBook).toString -> Set(AlgorithmType.Social),
AlgorithmToFeedbackTokenMap(StrongTiePredictionRec).toString -> Set(AlgorithmType.Social),
AlgorithmToFeedbackTokenMap(StrongTiePredictionRecWithSocialProof).toString -> Set(
AlgorithmType.Social),
AlgorithmToFeedbackTokenMap(OnlineStrongTiePredictionRec).toString -> Set(AlgorithmType.Social),
AlgorithmToFeedbackTokenMap(OnlineStrongTiePredictionRecNoCaching).toString -> Set(
AlgorithmType.Social),
AlgorithmToFeedbackTokenMap(TriangularLoop).toString -> Set(AlgorithmType.Social),
AlgorithmToFeedbackTokenMap(StrongTiePredictionPmi).toString -> Set(AlgorithmType.Social),
AlgorithmToFeedbackTokenMap(OnlineStrongTiePredictionRAB).toString -> Set(AlgorithmType.Social),
// Geo Algorithms:
AlgorithmToFeedbackTokenMap(PopCountryBackFill).toString -> Set(AlgorithmType.Geo),
AlgorithmToFeedbackTokenMap(PopCountry).toString -> Set(AlgorithmType.Geo),
AlgorithmToFeedbackTokenMap(PopGeohash).toString -> Set(AlgorithmType.Geo),
// AlgorithmToFeedbackTokenMap(PopGeohashRealGraph).toString -> Set(AlgorithmType.Geo),
AlgorithmToFeedbackTokenMap(EngagedFollowerRatio).toString -> Set(AlgorithmType.Geo),
AlgorithmToFeedbackTokenMap(CrowdSearchAccounts).toString -> Set(AlgorithmType.Geo),
AlgorithmToFeedbackTokenMap(OrganicFollowAccounts).toString -> Set(AlgorithmType.Geo),
AlgorithmToFeedbackTokenMap(PopGeohashQualityFollow).toString -> Set(AlgorithmType.Geo),
AlgorithmToFeedbackTokenMap(PPMILocaleFollow).toString -> Set(AlgorithmType.Geo),
// Interest Algorithms:
AlgorithmToFeedbackTokenMap(TttInterest).toString -> Set(AlgorithmType.Interest),
AlgorithmToFeedbackTokenMap(UttInterestRelatedUsers).toString -> Set(AlgorithmType.Interest),
AlgorithmToFeedbackTokenMap(UttSeedAccounts).toString -> Set(AlgorithmType.Interest),
AlgorithmToFeedbackTokenMap(UttProducerExpansion).toString -> Set(AlgorithmType.Interest),
// Hybrid (more than one type) Algorithms:
AlgorithmToFeedbackTokenMap(UttProducerOfflineMbcgV1).toString -> Set(
AlgorithmType.Interest,
AlgorithmType.Geo),
AlgorithmToFeedbackTokenMap(CuratedAccounts).toString -> Set(
AlgorithmType.Interest,
AlgorithmType.Geo),
AlgorithmToFeedbackTokenMap(UserUserGraph).toString -> Set(
AlgorithmType.Social,
AlgorithmType.Activity),
)
def getAlgorithmTypes(algoId: String): Set[String] = {
AlgorithmIdToType.get(algoId).map(_.map(_.toString)).getOrElse(Set.empty)
}
}

View File

@ -1,43 +0,0 @@
package com.twitter.follow_recommendations.common.constants
object GuiceNamedConstants {
final val PRODUCER_SIDE_FEATURE_SWITCHES = "PRODUCER_SIDE_FEATURE_SWITCHES"
final val CLIENT_EVENT_LOGGER = "CLIENT_EVENT_LOGGER"
final val COSINE_FOLLOW_FETCHER = "cosine_follow_fetcher"
final val COSINE_LIST_FETCHER = "cosine_list_fetcher"
final val CURATED_CANDIDATES_FETCHER = "curated_candidates_fetcher"
final val CURATED_COMPETITOR_ACCOUNTS_FETCHER = "curated_competitor_accounts_fetcher"
final val POP_USERS_IN_PLACE_FETCHER = "pop_users_in_place_fetcher"
final val PROFILE_SIDEBAR_BLACKLIST_SCANNER = "profile_sidebar_blacklist_scanner"
final val REQUEST_LOGGER = "REQUEST_LOGGER"
final val FLOW_LOGGER = "FLOW_LOGGER"
final val REAL_TIME_INTERACTIONS_FETCHER = "real_time_interactions_fetcher"
final val SIMS_FETCHER = "sims_fetcher"
final val DBV2_SIMS_FETCHER = "dbv2_sims_fetcher"
final val TRIANGULAR_LOOPS_FETCHER = "triangular_loops_fetcher"
final val TWO_HOP_RANDOM_WALK_FETCHER = "two_hop_random_walk_fetcher"
final val USER_RECOMMENDABILITY_FETCHER = "user_recommendability_fetcher"
final val USER_STATE_FETCHER = "user_state_fetcher"
final val UTT_ACCOUNT_RECOMMENDATIONS_FETCHER = "utt_account_recomendations_fetcher"
final val UTT_SEED_ACCOUNTS_FETCHER = "utt_seed_accounts_fetcher"
final val ELECTION_CANDIDATES_FETCHER = "election_candidates_fetcher"
final val POST_NUX_WTF_FEATURES_FETCHER = "post_nux_wtf_features_fetcher"
final val USER_USER_GRAPH_FETCHER = "user_user_graph_fetcher"
final val DISMISS_STORE_SCANNER = "dismiss_store_scanner"
final val LABELED_NOTIFICATION_FETCHER = "labeled_notification_scanner"
final val STP_EP_SCORER = "stp_ep_scorer"
final val STP_DBV2_SCORER = "stp_dbv2_scorer"
final val STP_RAB_DBV2_SCORER = "stp_rab_dbv2_scorer"
final val EXTENDED_NETWORK = "extended_network_candidates"
// scoring client constants
final val WTF_PROD_DEEPBIRDV2_CLIENT = "wtf_prod_deepbirdv2_client"
// ann clients
final val RELATABLE_ACCOUNTS_FETCHER = "relatable_accounts_fetcher"
}

View File

@ -1,15 +0,0 @@
package com.twitter.follow_recommendations.common.constants
import com.twitter.conversions.StorageUnitOps._
object ServiceConstants {
/** thrift client response size limits
* these were estimated using monitoring dashboard
* 3MB network usage per second / 25 rps ~ 120KB/req << 1MB
* we give some buffer here in case some requests require more data than others
*/
val StringLengthLimit: Long =
10.megabyte.inBytes
val ContainerLengthLimit: Long = 1.megabyte.inBytes
}

View File

@ -1,82 +0,0 @@
scala_library(
compiler_option_sets = ["fatal_warnings"],
platform = "java8",
tags = ["bazel-compatible"],
dependencies = [
":candidate-algorithm-adapter",
":client-context-adapter",
":post-nux-algorithm-adapter",
":pre-fetched-feature-adapter",
],
)
target(
name = "common",
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/org/slf4j:slf4j-api",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/base",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/clients/strato",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/constants",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/feature_hydration/common",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/models",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/utils",
"src/java/com/twitter/ml/api:api-base",
"src/scala/com/twitter/ml/api/util",
"src/scala/com/twitter/onboarding/relevance/util/metadata",
"util/util-slf4j-api/src/main/scala",
],
)
scala_library(
name = "candidate-algorithm-adapter",
sources = [
"CandidateAlgorithmAdapter.scala",
],
compiler_option_sets = ["fatal_warnings"],
platform = "java8",
tags = ["bazel-compatible"],
dependencies = [
":common",
"hermit/hermit-core/src/main/scala/com/twitter/hermit/constants",
],
)
scala_library(
name = "client-context-adapter",
sources = [
"ClientContextAdapter.scala",
],
compiler_option_sets = ["fatal_warnings"],
platform = "java8",
tags = ["bazel-compatible"],
dependencies = [
":common",
"snowflake/src/main/scala/com/twitter/snowflake/id",
],
)
scala_library(
name = "post-nux-algorithm-adapter",
sources = [
"PostNuxAlgorithmAdapter.scala",
],
platform = "java8",
tags = ["bazel-compatible"],
dependencies = [
":common",
"src/scala/com/twitter/ml/featurestore/catalog/features/customer_journey:post-nux-algorithm-aggregate",
],
)
scala_library(
name = "pre-fetched-feature-adapter",
sources = [
"PreFetchedFeatureAdapter.scala",
],
platform = "java8",
tags = ["bazel-compatible"],
dependencies = [
":common",
],
)

View File

@ -1,72 +0,0 @@
package com.twitter.follow_recommendations.common.feature_hydration.adapters
import com.twitter.follow_recommendations.common.models.UserCandidateSourceDetails
import com.twitter.hermit.constants.AlgorithmFeedbackTokens.AlgorithmToFeedbackTokenMap
import com.twitter.hermit.model.Algorithm
import com.twitter.hermit.model.Algorithm.Algorithm
import com.twitter.hermit.model.Algorithm.UttProducerOfflineMbcgV1
import com.twitter.hermit.model.Algorithm.UttProducerOnlineMbcgV1
import com.twitter.ml.api.DataRecord
import com.twitter.ml.api.Feature.SparseBinary
import com.twitter.ml.api.Feature.SparseContinuous
import com.twitter.ml.api.FeatureContext
import com.twitter.ml.api.IRecordOneToOneAdapter
import com.twitter.ml.api.util.FDsl._
object CandidateAlgorithmAdapter
extends IRecordOneToOneAdapter[Option[UserCandidateSourceDetails]] {
val CANDIDATE_ALGORITHMS: SparseBinary = new SparseBinary("candidate.source.algorithm_ids")
val CANDIDATE_SOURCE_SCORES: SparseContinuous =
new SparseContinuous("candidate.source.scores")
val CANDIDATE_SOURCE_RANKS: SparseContinuous =
new SparseContinuous("candidate.source.ranks")
override val getFeatureContext: FeatureContext = new FeatureContext(
CANDIDATE_ALGORITHMS,
CANDIDATE_SOURCE_SCORES,
CANDIDATE_SOURCE_RANKS
)
/** list of candidate source remaps to avoid creating different features for experimental sources.
* the LHS should contain the experimental source, and the RHS should contain the prod source.
*/
def remapCandidateSource(a: Algorithm): Algorithm = a match {
case UttProducerOnlineMbcgV1 => UttProducerOfflineMbcgV1
case _ => a
}
// add the list of algorithm feedback tokens (integers) as a sparse binary feature
override def adaptToDataRecord(
userCandidateSourceDetailsOpt: Option[UserCandidateSourceDetails]
): DataRecord = {
val dr = new DataRecord()
userCandidateSourceDetailsOpt.foreach { userCandidateSourceDetails =>
val scoreMap = for {
(source, scoreOpt) <- userCandidateSourceDetails.candidateSourceScores
score <- scoreOpt
algo <- Algorithm.withNameOpt(source.name)
algoId <- AlgorithmToFeedbackTokenMap.get(remapCandidateSource(algo))
} yield algoId.toString -> score
val rankMap = for {
(source, rank) <- userCandidateSourceDetails.candidateSourceRanks
algo <- Algorithm.withNameOpt(source.name)
algoId <- AlgorithmToFeedbackTokenMap.get(remapCandidateSource(algo))
} yield algoId.toString -> rank.toDouble
val algoIds = scoreMap.keys.toSet ++ rankMap.keys.toSet
// hydrate if not empty
if (rankMap.nonEmpty) {
dr.setFeatureValue(CANDIDATE_SOURCE_RANKS, rankMap)
}
if (scoreMap.nonEmpty) {
dr.setFeatureValue(CANDIDATE_SOURCE_SCORES, scoreMap)
}
if (algoIds.nonEmpty) {
dr.setFeatureValue(CANDIDATE_ALGORITHMS, algoIds)
}
}
dr
}
}

View File

@ -1,79 +0,0 @@
package com.twitter.follow_recommendations.common.feature_hydration.adapters
import com.twitter.follow_recommendations.common.models.DisplayLocation
import com.twitter.ml.api.Feature.Binary
import com.twitter.ml.api.Feature.Continuous
import com.twitter.ml.api.Feature.Discrete
import com.twitter.ml.api.Feature.Text
import com.twitter.ml.api.util.FDsl._
import com.twitter.ml.api.DataRecord
import com.twitter.ml.api.FeatureContext
import com.twitter.ml.api.IRecordOneToOneAdapter
import com.twitter.onboarding.relevance.util.metadata.LanguageUtil
import com.twitter.product_mixer.core.model.marshalling.request.ClientContext
import com.twitter.snowflake.id.SnowflakeId
object ClientContextAdapter extends IRecordOneToOneAdapter[(ClientContext, DisplayLocation)] {
// we name features with `user.account` for relatively static user-related features
val USER_COUNTRY: Text = new Text("user.account.country")
val USER_LANGUAGE: Text = new Text("user.account.language")
// we name features with `user.context` for more dynamic user-related features
val USER_LANGUAGE_PREFIX: Text = new Text("user.context.language_prefix")
val USER_CLIENT: Discrete = new Discrete("user.context.client")
val USER_AGE: Continuous = new Continuous("user.context.age")
val USER_IS_RECENT: Binary = new Binary("user.is.recent")
// we name features with `meta` for meta info about the WTF recommendation request
val META_DISPLAY_LOCATION: Text = new Text("meta.display_location")
val META_POSITION: Discrete = new Discrete("meta.position")
// This indicates whether a data point is from a random serving policy
val META_IS_RANDOM: Binary = new Binary("prediction.engine.is_random")
val RECENT_WIN_IN_DAYS: Int = 30
val GOAL_META_POSITION: Long = 1L
val GOAL_META_IS_RANDOM: Boolean = true
override val getFeatureContext: FeatureContext = new FeatureContext(
USER_COUNTRY,
USER_LANGUAGE,
USER_AGE,
USER_LANGUAGE_PREFIX,
USER_CLIENT,
USER_IS_RECENT,
META_DISPLAY_LOCATION,
META_POSITION,
META_IS_RANDOM
)
/**
* we only want to set the relevant fields iff they exist to eliminate redundant information
* we do some simple normalization on the language code
* we set META_POSITION to 1 always
* we set META_IS_RANDOM to true always to simulate a random serving distribution
* @param record ClientContext and DisplayLocation from the request
*/
override def adaptToDataRecord(target: (ClientContext, DisplayLocation)): DataRecord = {
val dr = new DataRecord()
val cc = target._1
val dl = target._2
cc.countryCode.foreach(countryCode => dr.setFeatureValue(USER_COUNTRY, countryCode))
cc.languageCode.foreach(rawLanguageCode => {
val userLanguage = LanguageUtil.simplifyLanguage(rawLanguageCode)
val userLanguagePrefix = userLanguage.take(2)
dr.setFeatureValue(USER_LANGUAGE, userLanguage)
dr.setFeatureValue(USER_LANGUAGE_PREFIX, userLanguagePrefix)
})
cc.appId.foreach(appId => dr.setFeatureValue(USER_CLIENT, appId))
cc.userId.foreach(id =>
SnowflakeId.timeFromIdOpt(id).map { signupTime =>
val userAge = signupTime.untilNow.inMillis.toDouble
dr.setFeatureValue(USER_AGE, userAge)
dr.setFeatureValue(USER_IS_RECENT, signupTime.untilNow.inDays <= RECENT_WIN_IN_DAYS)
signupTime.untilNow.inDays
})
dr.setFeatureValue(META_DISPLAY_LOCATION, dl.toFsName)
dr.setFeatureValue(META_POSITION, GOAL_META_POSITION)
dr.setFeatureValue(META_IS_RANDOM, GOAL_META_IS_RANDOM)
dr
}
}

View File

@ -1,151 +0,0 @@
package com.twitter.follow_recommendations.common.feature_hydration.adapters
import com.twitter.ml.api.DataRecord
import com.twitter.ml.api.Feature
import com.twitter.ml.api.Feature.Continuous
import com.twitter.ml.api.FeatureContext
import com.twitter.ml.api.IRecordOneToOneAdapter
import com.twitter.ml.api.util.FDsl._
import com.twitter.ml.featurestore.catalog.features.customer_journey.PostNuxAlgorithmFeatures
import com.twitter.ml.featurestore.catalog.features.customer_journey.PostNuxAlgorithmIdAggregateFeatureGroup
import com.twitter.ml.featurestore.catalog.features.customer_journey.PostNuxAlgorithmTypeAggregateFeatureGroup
import scala.collection.JavaConverters._
object PostNuxAlgorithmIdAdapter extends PostNuxAlgorithmAdapter {
override val PostNuxAlgorithmFeatureGroup: PostNuxAlgorithmFeatures =
PostNuxAlgorithmIdAggregateFeatureGroup
// To keep the length of feature names reasonable, we remove the prefix added by FeatureStore.
override val FeatureStorePrefix: String =
"wtf_algorithm_id.customer_journey.post_nux_algorithm_id_aggregate_feature_group."
}
object PostNuxAlgorithmTypeAdapter extends PostNuxAlgorithmAdapter {
override val PostNuxAlgorithmFeatureGroup: PostNuxAlgorithmFeatures =
PostNuxAlgorithmTypeAggregateFeatureGroup
// To keep the length of feature names reasonable, we remove the prefix added by FeatureStore.
override val FeatureStorePrefix: String =
"wtf_algorithm_type.customer_journey.post_nux_algorithm_type_aggregate_feature_group."
}
trait PostNuxAlgorithmAdapter extends IRecordOneToOneAdapter[DataRecord] {
val PostNuxAlgorithmFeatureGroup: PostNuxAlgorithmFeatures
// The string that is attached to the feature name when it is fetched from feature store.
val FeatureStorePrefix: String
/**
*
* This stores transformed aggregate features for PostNux algorithm aggregate features. The
* transformation here is log-ratio, where ratio is the raw value divided by # of impressions.
*/
case class TransformedAlgorithmFeatures(
ratioLog: Continuous) {
def getFeatures: Seq[Continuous] = Seq(ratioLog)
}
private def applyFeatureStorePrefix(feature: Continuous) = new Continuous(
s"$FeatureStorePrefix${feature.getFeatureName}")
// The list of input features WITH the prefix assigned to them by FeatureStore.
lazy val allInputFeatures: Seq[Seq[Continuous]] = Seq(
PostNuxAlgorithmFeatureGroup.Aggregate7DayFeatures.map(applyFeatureStorePrefix),
PostNuxAlgorithmFeatureGroup.Aggregate30DayFeatures.map(applyFeatureStorePrefix)
)
// This is a list of the features WITHOUT the prefix assigned to them by FeatureStore.
lazy val outputBaseFeatureNames: Seq[Seq[Continuous]] = Seq(
PostNuxAlgorithmFeatureGroup.Aggregate7DayFeatures,
PostNuxAlgorithmFeatureGroup.Aggregate30DayFeatures
)
// We use backend impression to calculate ratio values.
lazy val ratioDenominators: Seq[Continuous] = Seq(
applyFeatureStorePrefix(PostNuxAlgorithmFeatureGroup.BackendImpressions7Days),
applyFeatureStorePrefix(PostNuxAlgorithmFeatureGroup.BackendImpressions30Days)
)
/**
* A mapping from an original feature's ID to the corresponding set of transformed features.
* This is used to compute the transformed features for each of the original ones.
*/
private lazy val TransformedFeaturesMap: Map[Continuous, TransformedAlgorithmFeatures] =
outputBaseFeatureNames.flatten.map { feature =>
(
// The input feature would have the FeatureStore prefix attached to it.
new Continuous(s"$FeatureStorePrefix${feature.getFeatureName}"),
// We don't keep the FeatureStore prefix to keep the length of feature names reasonable.
TransformedAlgorithmFeatures(
new Continuous(s"${feature.getFeatureName}-ratio-log")
))
}.toMap
/**
* Given a denominator, number of impressions, this function returns another function that adds
* transformed features (log1p and ratio) of an input feature to a DataRecord.
*/
private def addTransformedFeaturesToDataRecordFunc(
originalDr: DataRecord,
numImpressions: Double,
): (DataRecord, Continuous) => DataRecord = { (record: DataRecord, feature: Continuous) =>
{
Option(originalDr.getFeatureValue(feature)) foreach { featureValue =>
TransformedFeaturesMap.get(feature).foreach { transformedFeatures =>
record.setFeatureValue(
transformedFeatures.ratioLog,
// We don't use log1p here since the values are ratios and adding 1 to the _ratio_ would
// lead to logarithm of values between 1 and 2, essentially making all values the same.
math.log((featureValue + 1) / numImpressions)
)
}
}
record
}
}
/**
* @param record: The input record whose PostNuxAlgorithm aggregates are to be transformed.
* @return the input [[DataRecord]] with transformed aggregates added.
*/
override def adaptToDataRecord(record: DataRecord): DataRecord = {
if (record.continuousFeatures == null) {
// There are no base features available, and hence no transformations.
record
} else {
/**
* The `foldLeft` below goes through pairs of (1) Feature groups, such as those calculated over
* 7 days or 30 days, and (2) the number of impressions for each of these groups, which is the
* denominator when ratio is calculated.
*/
ratioDenominators
.zip(allInputFeatures).foldLeft( /* initial empty DataRecord */ record)(
(
/* DataRecord with transformed features up to here */ transformedRecord,
/* A tuple with the denominator (#impressions) and features to be transformed */ numImpressionsAndFeatures
) => {
val (numImpressionsFeature, features) = numImpressionsAndFeatures
Option(record.getFeatureValue(numImpressionsFeature)) match {
case Some(numImpressions) if numImpressions > 0.0 =>
/**
* With the number of impressions fixed, we generate a function that adds log-ratio
* for each feature in the current [[DataRecord]]. The `foldLeft` goes through all
* such features and applies that function while updating the kept DataRecord.
*/
features.foldLeft(transformedRecord)(
addTransformedFeaturesToDataRecordFunc(record, numImpressions))
case _ =>
transformedRecord
}
})
}
}
def getFeatures: Seq[Feature[_]] = TransformedFeaturesMap.values.flatMap(_.getFeatures).toSeq
override def getFeatureContext: FeatureContext =
new FeatureContext()
.addFeatures(this.getFeatures.asJava)
}

View File

@ -1,91 +0,0 @@
package com.twitter.follow_recommendations.common.feature_hydration.adapters
import com.twitter.follow_recommendations.common.feature_hydration.common.HasPreFetchedFeature
import com.twitter.follow_recommendations.common.models.CandidateUser
import com.twitter.ml.api.Feature.Continuous
import com.twitter.ml.api.util.FDsl._
import com.twitter.ml.api.DataRecord
import com.twitter.ml.api.FeatureContext
import com.twitter.ml.api.IRecordOneToOneAdapter
import com.twitter.util.Time
/**
* This adapter mimics UserRecentWTFImpressionsAndFollowsAdapter (for user) and
* RecentWTFImpressionsFeatureAdapter (for candidate) for extracting recent impression
* and follow features. This adapter extracts user, candidate, and pair-wise features.
*/
object PreFetchedFeatureAdapter
extends IRecordOneToOneAdapter[
(HasPreFetchedFeature, CandidateUser)
] {
// impression features
val USER_NUM_RECENT_IMPRESSIONS: Continuous = new Continuous(
"user.prefetch.num_recent_impressions"
)
val USER_LAST_IMPRESSION_DURATION: Continuous = new Continuous(
"user.prefetch.last_impression_duration"
)
val CANDIDATE_NUM_RECENT_IMPRESSIONS: Continuous = new Continuous(
"user-candidate.prefetch.num_recent_impressions"
)
val CANDIDATE_LAST_IMPRESSION_DURATION: Continuous = new Continuous(
"user-candidate.prefetch.last_impression_duration"
)
// follow features
val USER_NUM_RECENT_FOLLOWERS: Continuous = new Continuous(
"user.prefetch.num_recent_followers"
)
val USER_NUM_RECENT_FOLLOWED_BY: Continuous = new Continuous(
"user.prefetch.num_recent_followed_by"
)
val USER_NUM_RECENT_MUTUAL_FOLLOWS: Continuous = new Continuous(
"user.prefetch.num_recent_mutual_follows"
)
// impression + follow features
val USER_NUM_RECENT_FOLLOWED_IMPRESSIONS: Continuous = new Continuous(
"user.prefetch.num_recent_followed_impression"
)
val USER_LAST_FOLLOWED_IMPRESSION_DURATION: Continuous = new Continuous(
"user.prefetch.last_followed_impression_duration"
)
override def adaptToDataRecord(
record: (HasPreFetchedFeature, CandidateUser)
): DataRecord = {
val (target, candidate) = record
val dr = new DataRecord()
val t = Time.now
// set impression features for user, optionally for candidate
dr.setFeatureValue(USER_NUM_RECENT_IMPRESSIONS, target.numWtfImpressions.toDouble)
dr.setFeatureValue(
USER_LAST_IMPRESSION_DURATION,
(t - target.latestImpressionTime).inMillis.toDouble)
target.getCandidateImpressionCounts(candidate.id).foreach { counts =>
dr.setFeatureValue(CANDIDATE_NUM_RECENT_IMPRESSIONS, counts.toDouble)
}
target.getCandidateLatestTime(candidate.id).foreach { latestTime: Time =>
dr.setFeatureValue(CANDIDATE_LAST_IMPRESSION_DURATION, (t - latestTime).inMillis.toDouble)
}
// set recent follow features for user
dr.setFeatureValue(USER_NUM_RECENT_FOLLOWERS, target.numRecentFollowedUserIds.toDouble)
dr.setFeatureValue(USER_NUM_RECENT_FOLLOWED_BY, target.numRecentFollowedByUserIds.toDouble)
dr.setFeatureValue(USER_NUM_RECENT_MUTUAL_FOLLOWS, target.numRecentMutualFollows.toDouble)
dr.setFeatureValue(USER_NUM_RECENT_FOLLOWED_IMPRESSIONS, target.numFollowedImpressions.toDouble)
dr.setFeatureValue(
USER_LAST_FOLLOWED_IMPRESSION_DURATION,
target.lastFollowedImpressionDurationMs.getOrElse(Long.MaxValue).toDouble)
dr
}
override def getFeatureContext: FeatureContext = new FeatureContext(
USER_NUM_RECENT_IMPRESSIONS,
USER_LAST_IMPRESSION_DURATION,
CANDIDATE_NUM_RECENT_IMPRESSIONS,
CANDIDATE_LAST_IMPRESSION_DURATION,
USER_NUM_RECENT_FOLLOWERS,
USER_NUM_RECENT_FOLLOWED_BY,
USER_NUM_RECENT_MUTUAL_FOLLOWS,
USER_NUM_RECENT_FOLLOWED_IMPRESSIONS,
USER_LAST_FOLLOWED_IMPRESSION_DURATION,
)
}

View File

@ -1,18 +0,0 @@
scala_library(
compiler_option_sets = ["fatal_warnings"],
platform = "java8",
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/com/google/inject:guice",
"3rdparty/jvm/com/google/inject/extensions:guice-assistedinject",
"3rdparty/jvm/net/codingwell:scala-guice",
"3rdparty/jvm/org/slf4j:slf4j-api",
"finatra/inject/inject-core/src/main/scala",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/base",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/constants",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/models",
"follow-recommendations-service/common/src/main/scala/com/twitter/follow_recommendations/common/utils",
"src/java/com/twitter/ml/api:api-base",
"util/util-slf4j-api/src/main/scala",
],
)

View File

@ -1,23 +0,0 @@
package com.twitter.follow_recommendations.common.feature_hydration.common
import com.twitter.follow_recommendations.common.models.CandidateUser
import com.twitter.follow_recommendations.common.models.HasDisplayLocation
import com.twitter.follow_recommendations.common.models.HasSimilarToContext
import com.twitter.ml.api.DataRecord
import com.twitter.ml.api.FeatureContext
import com.twitter.product_mixer.core.model.marshalling.request.HasClientContext
import com.twitter.stitch.Stitch
import com.twitter.timelines.configapi.HasParams
trait FeatureSource {
def id: FeatureSourceId
def featureContext: FeatureContext
def hydrateFeatures(
target: HasClientContext
with HasPreFetchedFeature
with HasParams
with HasSimilarToContext
with HasDisplayLocation,
candidates: Seq[CandidateUser]
): Stitch[Map[CandidateUser, DataRecord]]
}

View File

@ -1,19 +0,0 @@
package com.twitter.follow_recommendations.common.feature_hydration.common
sealed trait FeatureSourceId
object FeatureSourceId {
object CandidateAlgorithmSourceId extends FeatureSourceId
object ClientContextSourceId extends FeatureSourceId
object FeatureStoreSourceId extends FeatureSourceId
object FeatureStoreTimelinesAuthorSourceId extends FeatureSourceId
object FeatureStoreGizmoduckSourceId extends FeatureSourceId
object FeatureStoreUserMetricCountsSourceId extends FeatureSourceId
object FeatureStoreNotificationSourceId extends FeatureSourceId
object FeatureStorePrecomputedNotificationSourceId extends FeatureSourceId
object FeatureStorePostNuxAlgorithmSourceId extends FeatureSourceId
@deprecated object StratoFeatureHydrationSourceId extends FeatureSourceId
object PreFetchedFeatureSourceId extends FeatureSourceId
object UserScoringFeatureSourceId extends FeatureSourceId
}

View File

@ -1,25 +0,0 @@
package com.twitter.follow_recommendations.common.feature_hydration.common
import com.twitter.follow_recommendations.common.models.HasMutualFollowedUserIds
import com.twitter.follow_recommendations.common.models.HasWtfImpressions
import com.twitter.follow_recommendations.common.models.WtfImpression
import com.twitter.util.Time
trait HasPreFetchedFeature extends HasMutualFollowedUserIds with HasWtfImpressions {
lazy val followedImpressions: Seq[WtfImpression] = {
for {
wtfImprList <- wtfImpressions.toSeq
wtfImpr <- wtfImprList
if recentFollowedUserIds.exists(_.contains(wtfImpr.candidateId))
} yield wtfImpr
}
lazy val numFollowedImpressions: Int = followedImpressions.size
lazy val lastFollowedImpressionDurationMs: Option[Long] = {
if (followedImpressions.nonEmpty) {
Some((Time.now - followedImpressions.map(_.latestTime).max).inMillis)
} else None
}
}

Some files were not shown because too many files have changed in this diff Show More