Delete graph-feature-service directory

This commit is contained in:
dogemanttv 2024-01-10 17:07:05 -06:00 committed by GitHub
parent 14f78e176b
commit 63be8c971c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 0 additions and 2571 deletions

View File

@ -1,67 +0,0 @@
alias(
name = "graph_feature_service-server",
target = ":graph_feature_service-server_lib",
)
target(
name = "graph_feature_service-server_lib",
dependencies = [
"graph-feature-service/src/main/scala/com/twitter/graph_feature_service/server",
],
)
alias(
name = "graph_feature_service-worker",
target = ":graph_feature_service-worker_lib",
)
target(
name = "graph_feature_service-worker_lib",
dependencies = [
"graph-feature-service/src/main/scala/com/twitter/graph_feature_service/worker",
],
)
jvm_binary(
name = "server-bin",
basename = "graph_feature_service-server",
main = "com.twitter.graph_feature_service.server.Main",
platform = "java8",
tags = ["bazel-compatible"],
dependencies = [
":graph_feature_service-server",
"3rdparty/jvm/ch/qos/logback:logback-classic",
"finagle/finagle-zipkin-scribe/src/main/scala",
"loglens/loglens-logback/src/main/scala/com/twitter/loglens/logback",
"twitter-server/logback-classic/src/main/scala",
],
)
jvm_binary(
name = "worker-bin",
basename = "graph_feature_service-worker",
main = "com.twitter.graph_feature_service.worker.Main",
platform = "java8",
tags = ["bazel-compatible"],
dependencies = [
":graph_feature_service-worker",
"3rdparty/jvm/ch/qos/logback:logback-classic",
"finagle/finagle-zipkin-scribe/src/main/scala",
"loglens/loglens-logback/src/main/scala/com/twitter/loglens/logback",
"twitter-server/logback-classic/src/main/scala",
],
)
jvm_app(
name = "server-bundle",
basename = "graph_feature_service-server-dist",
binary = ":server-bin",
tags = ["bazel-compatible"],
)
jvm_app(
name = "worker-bundle",
basename = "graph_feature_service-worker-dist",
binary = ":worker-bin",
tags = ["bazel-compatible"],
)

View File

@ -1,3 +0,0 @@
# Graph Feature Service
Graph Feature Service (GFS) is a distributed system that can provide various graph features for given pairs of users. For instance, given source user A and candidate user C, GFS can answer questions like “how many of As followings have favorited C”, “how many of As followings are following C”, and “how much C is similar to the users that A has favorited“.

View File

@ -1,62 +0,0 @@
# Common thrift types
GFS uses several thrift datastructures which are common to multiple queries. They are listed below.
## EdgeType
`EdgeType` is a thrift enum which specifies which edge types to query for the graph.
```thrift
enum EdgeType {
FOLLOWING,
FOLLOWED_BY,
FAVORITE,
FAVORITED_BY,
RETWEET,
RETWEETED_BY,
REPLY,
REPLYED_BY,
MENTION,
MENTIONED_BY,
MUTUAL_FOLLOW,
SIMILAR_TO, // more edge types (like block, report, etc.) can be supported later.
RESERVED_12,
RESERVED_13,
RESERVED_14,
RESERVED_15,
RESERVED_16,
RESERVED_17,
RESERVED_18,
RESERVED_19,
RESERVED_20
}
```
For an example of how this is used, consider the `GetNeighbors` query. If we set the `edgeType` field
of the `GfsNeighborsRequest`, the response will contain all the users that the specified user follows.
If, on the other hand, we set `edgeType` to be `FollowedBy` it will return all the users who are
followed by the specified user.
## FeatureType
`FeatureType` is a thrift struct which is used in queries which require two edge types.
```thrift
struct FeatureType {
1: required EdgeType leftEdgeType // edge type from source user
2: required EdgeType rightEdgeType // edge type from candidate user
}(persisted="true")
```
## UserWithScore
The candidate generation queries return lists of candidates together with a computed score for the
relevant feature. `UserWithScore` is a thrift struct which bundles together a candidate's ID with
the score.
```thrift
struct UserWithScore {
1: required i64 userId
2: required double score
}
```

View File

@ -1,43 +0,0 @@
# GetIntersection
## Request and response syntax
A `GetIntersection` call takes as input a `GfsIntersectionRequest` thrift struct.
```thrift
struct GfsIntersectionRequest {
1: required i64 userId
2: required list<i64> candidateUserIds
3: required list<FeatureType> featureTypes
}
```
The response is returned in a `GfsIntersectionResponse` thrift struct.
```thrift
struct GfsIntersectionResponse {
1: required i64 userId
2: required list<GfsIntersectionResult> results
}
struct GfsIntersectionResult {
1: required i64 candidateUserId
2: required list<IntersectionValue> intersectionValues
}
struct IntersectionValue {
1: required FeatureType featureType
2: optional i32 count
3: optional list<i64> intersectionIds
4: optional i32 leftNodeDegree
5: optional i32 rightNodeDegree
}(persisted="true")
```
## Behavior
The `GfsIntersectionResponse` contains in its `results` field a `GfsIntersectionResult` for every candidate in `candidateIds` which contains an `IntersectionValue` for every `FeatureType` in the request's `featureTypes` field.
The `IntersectionValue` contains the size of the intersection between the `leftEdgeType` edges from `userId` and the `rightEdgeType` edges from `candidateId` in the `count` field, as well as their respective degrees in the graphs in `leftNodeDegree` and `rightNodeDegree` respectively.
**Note:** the `intersectionIds` field currently only contains `Nil`.

View File

@ -1,8 +0,0 @@
scala_library(
platform = "java8",
tags = [
"bazel-compatible",
"bazel-only",
],
dependencies = ["src/scala/com/twitter/storehaus_internal/util"],
)

View File

@ -1,73 +0,0 @@
package com.twitter.graph_feature_service.common
import com.twitter.conversions.DurationOps._
import com.twitter.util.Duration
import com.twitter.util.Time
import java.nio.ByteBuffer
import scala.util.hashing.MurmurHash3
object Configs {
// NOTE: notify #recos-platform slack room, if you want to change this.
// This SHOULD be updated together with NUM_SHARDS in worker.aurora
final val NumGraphShards: Int = 40
final val TopKRealGraph: Int = 512
final val BaseHdfsPath: String = "/user/cassowary/processed/gfs/constant_db/"
// whether or not to write in_value and out_value graphs. Used in the scalding job.
final val EnableValueGraphs: Boolean = true
// whether or not to write in_key and out_key graphs. Used in the scalding job.
final val EnableKeyGraphs: Boolean = false
final val FollowOutValPath: String = "follow_out_val/"
final val FollowOutKeyPath: String = "follow_out_key/"
final val FollowInValPath: String = "follow_in_val/"
final val FollowInKeyPath: String = "follow_in_key/"
final val MutualFollowValPath: String = "mutual_follow_val/"
final val MutualFollowKeyPath: String = "mutual_follow_key/"
final val FavoriteOutValPath: String = "favorite_out_val/"
final val FavoriteInValPath: String = "favorite_in_val/"
final val FavoriteOutKeyPath: String = "favorite_out_key/"
final val FavoriteInKeyPath: String = "favorite_in_key/"
final val RetweetOutValPath: String = "retweet_out_val/"
final val RetweetInValPath: String = "retweet_in_val/"
final val RetweetOutKeyPath: String = "retweet_out_key/"
final val RetweetInKeyPath: String = "retweet_in_key/"
final val MentionOutValPath: String = "mention_out_val/"
final val MentionInValPath: String = "mention_in_val/"
final val MentionOutKeyPath: String = "mention_out_key/"
final val MentionInKeyPath: String = "mention_in_key/"
final val MemCacheTTL: Duration = 8.hours
final val RandomSeed: Int = 39582942
def getTimedHdfsShardPath(shardId: Int, path: String, time: Time): String = {
val timeStr = time.format("yyyy/MM/dd")
s"$path/$timeStr/shard_$shardId"
}
def getHdfsPath(path: String, overrideBaseHdfsPath: Option[String] = None): String = {
val basePath = overrideBaseHdfsPath.getOrElse(BaseHdfsPath)
s"$basePath$path"
}
private def hash(kArr: Array[Byte], seed: Int): Int = {
MurmurHash3.bytesHash(kArr, seed) & 0x7fffffff // keep positive
}
private def hashLong(l: Long, seed: Int): Int = {
hash(ByteBuffer.allocate(8).putLong(l).array(), seed)
}
def shardForUser(userId: Long): Int = {
hashLong(userId, RandomSeed) % NumGraphShards
}
}

View File

@ -1,35 +0,0 @@
scala_library(
sources = ["**/*.scala"],
platform = "java8",
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/com/google/inject:guice",
"3rdparty/jvm/com/twitter/storehaus:core",
"3rdparty/jvm/javax/inject:javax.inject",
"3rdparty/jvm/net/codingwell:scala-guice",
"3rdparty/jvm/org/lz4:lz4-java",
"3rdparty/jvm/org/slf4j:slf4j-api",
"discovery-common/src/main/scala/com/twitter/discovery/common/stats",
"finagle/finagle-http/src/main/scala",
"finatra-internal/decider/src/main/scala",
"finatra-internal/mtls-thriftmux/src/main/scala",
"finatra/inject/inject-app/src/main/scala",
"finatra/inject/inject-core/src/main/scala",
"finatra/inject/inject-server/src/main/scala",
"finatra/inject/inject-thrift-client/src/main/scala",
"finatra/inject/inject-utils/src/main/scala",
"finatra/thrift/src/main/scala/com/twitter/finatra/thrift",
"finatra/thrift/src/main/scala/com/twitter/finatra/thrift:controller",
"finatra/thrift/src/main/scala/com/twitter/finatra/thrift/filters",
"finatra/thrift/src/main/scala/com/twitter/finatra/thrift/routing",
"graph-feature-service/src/main/resources",
"graph-feature-service/src/main/scala/com/twitter/graph_feature_service/common",
"graph-feature-service/src/main/scala/com/twitter/graph_feature_service/util",
"graph-feature-service/src/main/thrift/com/twitter/graph_feature_service:graph_feature_service_thrift-scala",
"hermit/hermit-core/src/main/scala/com/twitter/hermit/store/common",
"servo/request/src/main/scala",
"src/scala/com/twitter/storehaus_internal/memcache",
"util/util-app/src/main/scala",
"util/util-slf4j-api/src/main/scala",
],
)

View File

@ -1,56 +0,0 @@
package com.twitter.graph_feature_service.server
import com.google.inject.Module
import com.twitter.finatra.decider.modules.DeciderModule
import com.twitter.finatra.mtls.thriftmux.Mtls
import com.twitter.finatra.thrift.ThriftServer
import com.twitter.finatra.thrift.filters.{
AccessLoggingFilter,
LoggingMDCFilter,
StatsFilter,
ThriftMDCFilter,
TraceIdMDCFilter
}
import com.twitter.finatra.mtls.thriftmux.modules.MtlsThriftWebFormsModule
import com.twitter.finatra.thrift.routing.ThriftRouter
import com.twitter.graph_feature_service.server.controllers.ServerController
import com.twitter.graph_feature_service.server.handlers.ServerWarmupHandler
import com.twitter.graph_feature_service.server.modules.{
GetIntersectionStoreModule,
GraphFeatureServiceWorkerClientsModule,
ServerFlagsModule
}
import com.twitter.graph_feature_service.thriftscala
import com.twitter.inject.thrift.modules.ThriftClientIdModule
object Main extends ServerMain
class ServerMain extends ThriftServer with Mtls {
override val name = "graph_feature_service-server"
override val modules: Seq[Module] = {
Seq(
ServerFlagsModule,
DeciderModule,
ThriftClientIdModule,
GraphFeatureServiceWorkerClientsModule,
GetIntersectionStoreModule,
new MtlsThriftWebFormsModule[thriftscala.Server.MethodPerEndpoint](this)
)
}
override def configureThrift(router: ThriftRouter): Unit = {
router
.filter[LoggingMDCFilter]
.filter[TraceIdMDCFilter]
.filter[ThriftMDCFilter]
.filter[AccessLoggingFilter]
.filter[StatsFilter]
.add[ServerController]
}
override protected def warmup(): Unit = {
handle[ServerWarmupHandler]()
}
}

View File

@ -1,46 +0,0 @@
package com.twitter.graph_feature_service.server.controllers
import com.twitter.discovery.common.stats.DiscoveryStatsFilter
import com.twitter.finagle.Service
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finatra.thrift.Controller
import com.twitter.graph_feature_service.server.handlers.ServerGetIntersectionHandler.GetIntersectionRequest
import com.twitter.graph_feature_service.server.handlers.ServerGetIntersectionHandler
import com.twitter.graph_feature_service.thriftscala
import com.twitter.graph_feature_service.thriftscala.Server.GetIntersection
import com.twitter.graph_feature_service.thriftscala.Server.GetPresetIntersection
import com.twitter.graph_feature_service.thriftscala._
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
class ServerController @Inject() (
serverGetIntersectionHandler: ServerGetIntersectionHandler
)(
implicit statsReceiver: StatsReceiver)
extends Controller(thriftscala.Server) {
private val getIntersectionService: Service[GetIntersectionRequest, GfsIntersectionResponse] =
new DiscoveryStatsFilter(statsReceiver.scope("srv").scope("get_intersection"))
.andThen(Service.mk(serverGetIntersectionHandler))
val getIntersection: Service[GetIntersection.Args, GfsIntersectionResponse] = { args =>
// TODO: Disable updateCache after HTL switch to use PresetIntersection endpoint.
getIntersectionService(
GetIntersectionRequest.fromGfsIntersectionRequest(args.request, cacheable = true))
}
handle(GetIntersection) { getIntersection }
def getPresetIntersection: Service[
GetPresetIntersection.Args,
GfsIntersectionResponse
] = { args =>
// TODO: Refactor after HTL switch to PresetIntersection
val cacheable = args.request.presetFeatureTypes == PresetFeatureTypes.HtlTwoHop
getIntersectionService(
GetIntersectionRequest.fromGfsPresetIntersectionRequest(args.request, cacheable))
}
handle(GetPresetIntersection) { getPresetIntersection }
}

View File

@ -1,198 +0,0 @@
package com.twitter.graph_feature_service.server.handlers
import com.twitter.finagle.stats.Stat
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.graph_feature_service.server.handlers.ServerGetIntersectionHandler.GetIntersectionRequest
import com.twitter.graph_feature_service.server.stores.FeatureTypesEncoder
import com.twitter.graph_feature_service.server.stores.GetIntersectionStore.GetIntersectionQuery
import com.twitter.graph_feature_service.thriftscala.PresetFeatureTypes
import com.twitter.graph_feature_service.thriftscala._
import com.twitter.graph_feature_service.util.FeatureTypesCalculator
import com.twitter.servo.request.RequestHandler
import com.twitter.storehaus.ReadableStore
import com.twitter.util.Future
import com.twitter.util.Memoize
import javax.inject.Inject
import javax.inject.Named
import javax.inject.Singleton
@Singleton
class ServerGetIntersectionHandler @Inject() (
@Named("ReadThroughGetIntersectionStore")
readThroughStore: ReadableStore[GetIntersectionQuery, CachedIntersectionResult],
@Named("BypassCacheGetIntersectionStore")
readOnlyStore: ReadableStore[GetIntersectionQuery, CachedIntersectionResult]
)(
implicit statsReceiver: StatsReceiver)
extends RequestHandler[GetIntersectionRequest, GfsIntersectionResponse] {
import ServerGetIntersectionHandler._
// TODO: Track all the stats based on PresetFeatureType and update the dashboard
private val stats: StatsReceiver = statsReceiver.scope("srv").scope("get_intersection")
private val numCandidatesCount = stats.counter("total_num_candidates")
private val numCandidatesStat = stats.stat("num_candidates")
private val numFeaturesStat = stats.stat("num_features")
private val userEmptyCount = stats.counter("user_empty_count")
private val candidateEmptyRateStat = stats.stat("candidate_empty_rate")
private val candidateNumEmptyStat = stats.stat("candidate_num_empty")
private val missedRateStat = stats.stat("miss_rate")
private val numMissedStat = stats.stat("num_missed")
// Assume the order from HTL doesn't change. Only log the HTL query now.
private val featureStatMap = FeatureTypesCalculator.presetFeatureTypes.map { feature =>
val featureString = s"${feature.leftEdgeType.name}_${feature.rightEdgeType.name}"
feature -> Array(
stats.counter(s"feature_type_${featureString}_total"),
stats.counter(s"feature_type_${featureString}_count_zero"),
stats.counter(s"feature_type_${featureString}_left_zero"),
stats.counter(s"feature_type_${featureString}_right_zero")
)
}.toMap
private val sourceCandidateNumStats = Memoize[PresetFeatureTypes, Stat] { presetFeature =>
stats.stat(s"source_candidate_num_${presetFeature.name}")
}
override def apply(request: GetIntersectionRequest): Future[GfsIntersectionResponse] = {
val featureTypes = request.calculatedFeatureTypes
val numCandidates = request.candidateUserIds.length
val numFeatures = featureTypes.length
numCandidatesCount.incr(numCandidates)
numCandidatesStat.add(numCandidates)
numFeaturesStat.add(numFeatures)
sourceCandidateNumStats(request.presetFeatureTypes).add(numCandidates)
// Note: do not change the orders of features and candidates.
val candidateIds = request.candidateUserIds
if (featureTypes.isEmpty || candidateIds.isEmpty) {
Future.value(DefaultGfsIntersectionResponse)
} else {
Future
.collect {
val getIntersectionStore = if (request.cacheable) readThroughStore else readOnlyStore
getIntersectionStore.multiGet(GetIntersectionQuery.buildQueries(request))
}.map { responses =>
val results = responses.collect {
case (query, Some(result)) =>
query.candidateId -> GfsIntersectionResult(
query.candidateId,
query.calculatedFeatureTypes.zip(result.values).map {
case (featureType, value) =>
IntersectionValue(
featureType,
Some(value.count),
if (value.intersectionIds.isEmpty) None else Some(value.intersectionIds),
Some(value.leftNodeDegree),
Some(value.rightNodeDegree)
)
}
)
}
// Keep the response order same as input
val processedResults = candidateIds.map { candidateId =>
results.getOrElse(candidateId, GfsIntersectionResult(candidateId, List.empty))
}
val candidateEmptyNum =
processedResults.count(
_.intersectionValues.exists(value => isZero(value.rightNodeDegree)))
val numMissed = processedResults.count(_.intersectionValues.size != numFeatures)
if (processedResults.exists(
_.intersectionValues.forall(value => isZero(value.leftNodeDegree)))) {
userEmptyCount.incr()
}
candidateNumEmptyStat.add(candidateEmptyNum)
candidateEmptyRateStat.add(candidateEmptyNum.toFloat / numCandidates)
numMissedStat.add(numMissed)
missedRateStat.add(numMissed.toFloat / numCandidates)
processedResults.foreach { result =>
result.intersectionValues.zip(featureTypes).foreach {
case (value, featureType) =>
featureStatMap.get(featureType).foreach { statsArray =>
statsArray(TotalIndex).incr()
if (isZero(value.count)) {
statsArray(CountIndex).incr()
}
if (isZero(value.leftNodeDegree)) {
statsArray(LeftIndex).incr()
}
if (isZero(value.rightNodeDegree)) {
statsArray(RightIndex).incr()
}
}
}
}
GfsIntersectionResponse(processedResults)
}
}
}
}
private[graph_feature_service] object ServerGetIntersectionHandler {
case class GetIntersectionRequest(
userId: Long,
candidateUserIds: Seq[Long],
featureTypes: Seq[FeatureType],
presetFeatureTypes: PresetFeatureTypes,
intersectionIdLimit: Option[Int],
cacheable: Boolean) {
lazy val calculatedFeatureTypes: Seq[FeatureType] =
FeatureTypesCalculator.getFeatureTypes(presetFeatureTypes, featureTypes)
lazy val calculatedFeatureTypesString: String =
FeatureTypesEncoder(calculatedFeatureTypes)
}
object GetIntersectionRequest {
def fromGfsIntersectionRequest(
request: GfsIntersectionRequest,
cacheable: Boolean
): GetIntersectionRequest = {
GetIntersectionRequest(
request.userId,
request.candidateUserIds,
request.featureTypes,
PresetFeatureTypes.Empty,
request.intersectionIdLimit,
cacheable)
}
def fromGfsPresetIntersectionRequest(
request: GfsPresetIntersectionRequest,
cacheable: Boolean
): GetIntersectionRequest = {
GetIntersectionRequest(
request.userId,
request.candidateUserIds,
List.empty,
request.presetFeatureTypes,
request.intersectionIdLimit,
cacheable)
}
}
private val DefaultGfsIntersectionResponse = GfsIntersectionResponse()
private val TotalIndex = 0
private val CountIndex = 1
private val LeftIndex = 2
private val RightIndex = 3
def isZero(opt: Option[Int]): Boolean = {
!opt.exists(_ != 0)
}
}

View File

@ -1,45 +0,0 @@
package com.twitter.graph_feature_service.server.handlers
import com.twitter.finatra.thrift.routing.ThriftWarmup
import com.twitter.graph_feature_service.thriftscala.EdgeType.FavoritedBy
import com.twitter.graph_feature_service.thriftscala.EdgeType.FollowedBy
import com.twitter.graph_feature_service.thriftscala.EdgeType.Following
import com.twitter.graph_feature_service.thriftscala.Server.GetIntersection
import com.twitter.graph_feature_service.thriftscala.FeatureType
import com.twitter.graph_feature_service.thriftscala.GfsIntersectionRequest
import com.twitter.inject.utils.Handler
import com.twitter.scrooge.Request
import com.twitter.util.logging.Logger
import javax.inject.Inject
import javax.inject.Singleton
import scala.util.Random
@Singleton
class ServerWarmupHandler @Inject() (warmup: ThriftWarmup) extends Handler {
val logger: Logger = Logger("WarmupHandler")
// TODO: Add the testing accounts to warm-up the service.
private val testingAccounts: Array[Long] = Seq.empty.toArray
private def getRandomRequest: GfsIntersectionRequest = {
GfsIntersectionRequest(
testingAccounts(Random.nextInt(testingAccounts.length)),
testingAccounts,
Seq(FeatureType(Following, FollowedBy), FeatureType(Following, FavoritedBy))
)
}
override def handle(): Unit = {
warmup.sendRequest(
GetIntersection,
Request(
GetIntersection.Args(
getRandomRequest
)),
10
)()
logger.info("Warmup Done!")
}
}

View File

@ -1,91 +0,0 @@
package com.twitter.graph_feature_service.server.modules
import com.google.inject.Provides
import com.twitter.bijection.scrooge.CompactScalaCodec
import com.twitter.conversions.DurationOps._
import com.twitter.finagle.mtls.authentication.ServiceIdentifier
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.graph_feature_service.common.Configs._
import com.twitter.graph_feature_service.server.stores.GetIntersectionStore
import com.twitter.graph_feature_service.server.stores.GetIntersectionStore.GetIntersectionQuery
import com.twitter.graph_feature_service.thriftscala.CachedIntersectionResult
import com.twitter.hermit.store.common.ObservedMemcachedReadableStore
import com.twitter.inject.TwitterModule
import com.twitter.inject.annotations.Flag
import com.twitter.storehaus.ReadableStore
import com.twitter.storehaus_internal.memcache.MemcacheStore
import com.twitter.storehaus_internal.util.{ClientName, ZkEndPoint}
import com.twitter.util.Duration
import javax.inject.{Named, Singleton}
/**
* Initialize the MemCache based GetIntersectionStore.
* The Key of MemCache is UserId~CandidateId~FeatureTypes~IntersectionIdLimit.
*/
object GetIntersectionStoreModule extends TwitterModule {
private[this] val requestTimeout: Duration = 25.millis
private[this] val retries: Int = 0
@Provides
@Named("ReadThroughGetIntersectionStore")
@Singleton
def provideReadThroughGetIntersectionStore(
graphFeatureServiceWorkerClients: GraphFeatureServiceWorkerClients,
serviceIdentifier: ServiceIdentifier,
@Flag(ServerFlagNames.MemCacheClientName) memCacheName: String,
@Flag(ServerFlagNames.MemCachePath) memCachePath: String
)(
implicit statsReceiver: StatsReceiver
): ReadableStore[GetIntersectionQuery, CachedIntersectionResult] = {
buildMemcacheStore(
graphFeatureServiceWorkerClients,
memCacheName,
memCachePath,
serviceIdentifier)
}
@Provides
@Named("BypassCacheGetIntersectionStore")
@Singleton
def provideReadOnlyGetIntersectionStore(
graphFeatureServiceWorkerClients: GraphFeatureServiceWorkerClients,
)(
implicit statsReceiver: StatsReceiver
): ReadableStore[GetIntersectionQuery, CachedIntersectionResult] = {
// Bypass the Memcache.
GetIntersectionStore(graphFeatureServiceWorkerClients, statsReceiver)
}
private[this] def buildMemcacheStore(
graphFeatureServiceWorkerClients: GraphFeatureServiceWorkerClients,
memCacheName: String,
memCachePath: String,
serviceIdentifier: ServiceIdentifier,
)(
implicit statsReceiver: StatsReceiver
): ReadableStore[GetIntersectionQuery, CachedIntersectionResult] = {
val backingStore = GetIntersectionStore(graphFeatureServiceWorkerClients, statsReceiver)
val cacheClient = MemcacheStore.memcachedClient(
name = ClientName(memCacheName),
dest = ZkEndPoint(memCachePath),
timeout = requestTimeout,
retries = retries,
serviceIdentifier = serviceIdentifier,
statsReceiver = statsReceiver
)
ObservedMemcachedReadableStore.fromCacheClient[GetIntersectionQuery, CachedIntersectionResult](
backingStore = backingStore,
cacheClient = cacheClient,
ttl = MemCacheTTL
)(
valueInjection = LZ4Injection.compose(CompactScalaCodec(CachedIntersectionResult)),
statsReceiver = statsReceiver.scope("mem_cache"),
keyToString = { key =>
s"L~${key.userId}~${key.candidateId}~${key.featureTypesString}~${key.intersectionIdLimit}"
}
)
}
}

View File

@ -1,51 +0,0 @@
package com.twitter.graph_feature_service.server.modules
import com.google.inject.Provides
import com.twitter.conversions.DurationOps._
import com.twitter.finagle.mtls.authentication.ServiceIdentifier
import com.twitter.finagle.mtls.client.MtlsStackClient._
import com.twitter.finagle.ThriftMux
import com.twitter.finagle.service.RetryBudget
import com.twitter.graph_feature_service.thriftscala
import com.twitter.inject.TwitterModule
import com.twitter.inject.annotations.Flag
import com.twitter.util.{Await, Duration}
import javax.inject.Singleton
case class GraphFeatureServiceWorkerClients(
workers: Seq[thriftscala.Worker.MethodPerEndpoint])
object GraphFeatureServiceWorkerClientsModule extends TwitterModule {
private[this] val closeableGracePeriod: Duration = 1.second
private[this] val requestTimeout: Duration = 25.millis
@Provides
@Singleton
def provideGraphFeatureServiceWorkerClient(
@Flag(ServerFlagNames.NumWorkers) numWorkers: Int,
@Flag(ServerFlagNames.ServiceRole) serviceRole: String,
@Flag(ServerFlagNames.ServiceEnv) serviceEnv: String,
serviceIdentifier: ServiceIdentifier
): GraphFeatureServiceWorkerClients = {
val workers: Seq[thriftscala.Worker.MethodPerEndpoint] =
(0 until numWorkers).map { id =>
val dest = s"/srv#/$serviceEnv/local/$serviceRole/graph_feature_service-worker-$id"
val client = ThriftMux.client
.withRequestTimeout(requestTimeout)
.withRetryBudget(RetryBudget.Empty)
.withMutualTls(serviceIdentifier)
.build[thriftscala.Worker.MethodPerEndpoint](dest, s"worker-$id")
onExit {
val closeable = client.asClosable
Await.result(closeable.close(closeableGracePeriod), closeableGracePeriod)
}
client
}
GraphFeatureServiceWorkerClients(workers)
}
}

View File

@ -1,17 +0,0 @@
package com.twitter.graph_feature_service.server.modules
import com.twitter.bijection.Injection
import scala.util.Try
import net.jpountz.lz4.{LZ4CompressorWithLength, LZ4DecompressorWithLength, LZ4Factory}
object LZ4Injection extends Injection[Array[Byte], Array[Byte]] {
private val lz4Factory = LZ4Factory.fastestInstance()
private val fastCompressor = new LZ4CompressorWithLength(lz4Factory.fastCompressor())
private val decompressor = new LZ4DecompressorWithLength(lz4Factory.fastDecompressor())
override def apply(a: Array[Byte]): Array[Byte] = LZ4Injection.fastCompressor.compress(a)
override def invert(b: Array[Byte]): Try[Array[Byte]] = Try {
LZ4Injection.decompressor.decompress(b)
}
}

View File

@ -1,31 +0,0 @@
package com.twitter.graph_feature_service.server.modules
import com.twitter.inject.TwitterModule
object ServerFlagNames {
final val NumWorkers = "service.num_workers"
final val ServiceRole = "service.role"
final val ServiceEnv = "service.env"
final val MemCacheClientName = "service.mem_cache_client_name"
final val MemCachePath = "service.mem_cache_path"
}
/**
* Initializes references to the flag values defined in the aurora.deploy file.
* To check what the flag values are initialized in runtime, search FlagsModule in stdout
*/
object ServerFlagsModule extends TwitterModule {
import ServerFlagNames._
flag[Int](NumWorkers, "Num of workers")
flag[String](ServiceRole, "Service Role")
flag[String](ServiceEnv, "Service Env")
flag[String](MemCacheClientName, "MemCache Client Name")
flag[String](MemCachePath, "MemCache Path")
}

View File

@ -1,16 +0,0 @@
package com.twitter.graph_feature_service.server.stores
import com.twitter.graph_feature_service.common.Configs.RandomSeed
import com.twitter.graph_feature_service.thriftscala.FeatureType
import scala.util.hashing.MurmurHash3
object FeatureTypesEncoder {
def apply(featureTypes: Seq[FeatureType]): String = {
val byteArray = featureTypes.flatMap { featureType =>
Array(featureType.leftEdgeType.getValue.toByte, featureType.rightEdgeType.getValue.toByte)
}.toArray
(MurmurHash3.bytesHash(byteArray, RandomSeed) & 0x7fffffff).toString // keep positive
}
}

View File

@ -1,181 +0,0 @@
package com.twitter.graph_feature_service.server.stores
import com.twitter.finagle.RequestTimeoutException
import com.twitter.finagle.stats.{Stat, StatsReceiver}
import com.twitter.graph_feature_service.server.handlers.ServerGetIntersectionHandler.GetIntersectionRequest
import com.twitter.graph_feature_service.server.modules.GraphFeatureServiceWorkerClients
import com.twitter.graph_feature_service.server.stores.GetIntersectionStore.GetIntersectionQuery
import com.twitter.graph_feature_service.thriftscala._
import com.twitter.inject.Logging
import com.twitter.storehaus.ReadableStore
import com.twitter.util.Future
import javax.inject.Singleton
import scala.collection.mutable.ArrayBuffer
@Singleton
case class GetIntersectionStore(
graphFeatureServiceWorkerClients: GraphFeatureServiceWorkerClients,
statsReceiver: StatsReceiver)
extends ReadableStore[GetIntersectionQuery, CachedIntersectionResult]
with Logging {
import GetIntersectionStore._
private val stats = statsReceiver.scope("get_intersection_store")
private val requestCount = stats.counter(name = "request_count")
private val aggregatorLatency = stats.stat("aggregator_latency")
private val timeOutCounter = stats.counter("worker_timeouts")
private val unknownErrorCounter = stats.counter("unknown_errors")
override def multiGet[K1 <: GetIntersectionQuery](
ks: Set[K1]
): Map[K1, Future[Option[CachedIntersectionResult]]] = {
if (ks.isEmpty) {
Map.empty
} else {
requestCount.incr()
val head = ks.head
// We assume all the GetIntersectionQuery use the same userId and featureTypes
val userId = head.userId
val featureTypes = head.featureTypes
val presetFeatureTypes = head.presetFeatureTypes
val calculatedFeatureTypes = head.calculatedFeatureTypes
val intersectionIdLimit = head.intersectionIdLimit
val request = WorkerIntersectionRequest(
userId,
ks.map(_.candidateId).toArray,
featureTypes,
presetFeatureTypes,
intersectionIdLimit
)
val resultFuture = Future
.collect(
graphFeatureServiceWorkerClients.workers.map { worker =>
worker
.getIntersection(request)
.rescue {
case _: RequestTimeoutException =>
timeOutCounter.incr()
Future.value(DefaultWorkerIntersectionResponse)
case e =>
unknownErrorCounter.incr()
logger.error("Failure to load result.", e)
Future.value(DefaultWorkerIntersectionResponse)
}
}
).map { responses =>
Stat.time(aggregatorLatency) {
gfsIntersectionResponseAggregator(
responses,
calculatedFeatureTypes,
request.candidateUserIds,
intersectionIdLimit
)
}
}
ks.map { query =>
query -> resultFuture.map(_.get(query.candidateId))
}.toMap
}
}
/**
* Function to merge GfsIntersectionResponse from workers into one result.
*/
private def gfsIntersectionResponseAggregator(
responseList: Seq[WorkerIntersectionResponse],
features: Seq[FeatureType],
candidates: Seq[Long],
intersectionIdLimit: Int
): Map[Long, CachedIntersectionResult] = {
// Map of (candidate -> features -> type -> value)
val cube = Array.fill[Int](candidates.length, features.length, 3)(0)
// Map of (candidate -> features -> intersectionIds)
val ids = Array.fill[Option[ArrayBuffer[Long]]](candidates.length, features.length)(None)
val notZero = intersectionIdLimit != 0
for {
response <- responseList
(features, candidateIndex) <- response.results.zipWithIndex
(workerValue, featureIndex) <- features.zipWithIndex
} {
cube(candidateIndex)(featureIndex)(CountIndex) += workerValue.count
cube(candidateIndex)(featureIndex)(LeftDegreeIndex) += workerValue.leftNodeDegree
cube(candidateIndex)(featureIndex)(RightDegreeIndex) += workerValue.rightNodeDegree
if (notZero && workerValue.intersectionIds.nonEmpty) {
val arrayBuffer = ids(candidateIndex)(featureIndex) match {
case Some(buffer) => buffer
case None =>
val buffer = ArrayBuffer[Long]()
ids(candidateIndex)(featureIndex) = Some(buffer)
buffer
}
val intersectionIds = workerValue.intersectionIds
// Scan the intersectionId based on the Shard. The response order is consistent.
if (arrayBuffer.size < intersectionIdLimit) {
if (intersectionIds.size > intersectionIdLimit - arrayBuffer.size) {
arrayBuffer ++= intersectionIds.slice(0, intersectionIdLimit - arrayBuffer.size)
} else {
arrayBuffer ++= intersectionIds
}
}
}
}
candidates.zipWithIndex.map {
case (candidate, candidateIndex) =>
candidate -> CachedIntersectionResult(features.indices.map { featureIndex =>
WorkerIntersectionValue(
cube(candidateIndex)(featureIndex)(CountIndex),
cube(candidateIndex)(featureIndex)(LeftDegreeIndex),
cube(candidateIndex)(featureIndex)(RightDegreeIndex),
ids(candidateIndex)(featureIndex).getOrElse(Nil)
)
})
}.toMap
}
}
object GetIntersectionStore {
private[graph_feature_service] case class GetIntersectionQuery(
userId: Long,
candidateId: Long,
featureTypes: Seq[FeatureType],
presetFeatureTypes: PresetFeatureTypes,
featureTypesString: String,
calculatedFeatureTypes: Seq[FeatureType],
intersectionIdLimit: Int)
private[graph_feature_service] object GetIntersectionQuery {
def buildQueries(request: GetIntersectionRequest): Set[GetIntersectionQuery] = {
request.candidateUserIds.toSet.map { candidateId: Long =>
GetIntersectionQuery(
request.userId,
candidateId,
request.featureTypes,
request.presetFeatureTypes,
request.calculatedFeatureTypesString,
request.calculatedFeatureTypes,
request.intersectionIdLimit.getOrElse(DefaultIntersectionIdLimit)
)
}
}
}
// Don't return the intersectionId for better performance
private val DefaultIntersectionIdLimit = 0
private val DefaultWorkerIntersectionResponse = WorkerIntersectionResponse()
private val CountIndex = 0
private val LeftDegreeIndex = 1
private val RightDegreeIndex = 2
}

View File

@ -1,7 +0,0 @@
scala_library(
platform = "java8",
tags = ["bazel-compatible"],
dependencies = [
"graph-feature-service/src/main/thrift/com/twitter/graph_feature_service:graph_feature_service_thrift-scala",
],
)

View File

@ -1,58 +0,0 @@
package com.twitter.graph_feature_service.util
import com.twitter.graph_feature_service.thriftscala.EdgeType._
import com.twitter.graph_feature_service.thriftscala.{FeatureType, PresetFeatureTypes}
object FeatureTypesCalculator {
final val DefaultTwoHop = Seq(
FeatureType(Following, FollowedBy),
FeatureType(Following, FavoritedBy),
FeatureType(Following, RetweetedBy),
FeatureType(Following, MentionedBy),
FeatureType(Following, MutualFollow),
FeatureType(Favorite, FollowedBy),
FeatureType(Favorite, FavoritedBy),
FeatureType(Favorite, RetweetedBy),
FeatureType(Favorite, MentionedBy),
FeatureType(Favorite, MutualFollow),
FeatureType(MutualFollow, FollowedBy),
FeatureType(MutualFollow, FavoritedBy),
FeatureType(MutualFollow, RetweetedBy),
FeatureType(MutualFollow, MentionedBy),
FeatureType(MutualFollow, MutualFollow)
)
final val SocialProofTwoHop = Seq(FeatureType(Following, FollowedBy))
final val HtlTwoHop = DefaultTwoHop
final val WtfTwoHop = SocialProofTwoHop
final val SqTwoHop = DefaultTwoHop
final val RuxTwoHop = DefaultTwoHop
final val MRTwoHop = DefaultTwoHop
final val UserTypeaheadTwoHop = SocialProofTwoHop
final val presetFeatureTypes =
(HtlTwoHop ++ WtfTwoHop ++ SqTwoHop ++ RuxTwoHop ++ MRTwoHop ++ UserTypeaheadTwoHop).toSet
def getFeatureTypes(
presetFeatureTypes: PresetFeatureTypes,
featureTypes: Seq[FeatureType]
): Seq[FeatureType] = {
presetFeatureTypes match {
case PresetFeatureTypes.HtlTwoHop => HtlTwoHop
case PresetFeatureTypes.WtfTwoHop => WtfTwoHop
case PresetFeatureTypes.SqTwoHop => SqTwoHop
case PresetFeatureTypes.RuxTwoHop => RuxTwoHop
case PresetFeatureTypes.MrTwoHop => MRTwoHop
case PresetFeatureTypes.UserTypeaheadTwoHop => UserTypeaheadTwoHop
case _ => featureTypes
}
}
}

View File

@ -1,242 +0,0 @@
package com.twitter.graph_feature_service.util
import com.twitter.graph_feature_service.thriftscala.{
FeatureType,
IntersectionValue,
WorkerIntersectionValue
}
import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
/**
* Functions for computing feature values based on the values returned by constantDB.
*/
object IntersectionValueCalculator {
/**
* Compute the size of the array in a ByteBuffer.
* Note that this function assumes the ByteBuffer is encoded using Injections.seqLong2ByteBuffer
*/
def computeArraySize(x: ByteBuffer): Int = {
x.remaining() >> 3 // divide 8
}
/**
*
*/
def apply(x: ByteBuffer, y: ByteBuffer, intersectionIdLimit: Int): WorkerIntersectionValue = {
val xSize = computeArraySize(x)
val ySize = computeArraySize(y)
val largerArray = if (xSize > ySize) x else y
val smallerArray = if (xSize > ySize) y else x
if (intersectionIdLimit == 0) {
val result = computeIntersectionUsingBinarySearchOnLargerByteBuffer(smallerArray, largerArray)
WorkerIntersectionValue(result, xSize, ySize)
} else {
val (result, ids) = computeIntersectionWithIds(smallerArray, largerArray, intersectionIdLimit)
WorkerIntersectionValue(result, xSize, ySize, ids)
}
}
/**
* Note that this function assumes the ByteBuffer is encoded using Injections.seqLong2ByteBuffer
*
*/
def computeIntersectionUsingBinarySearchOnLargerByteBuffer(
smallArray: ByteBuffer,
largeArray: ByteBuffer
): Int = {
var res: Int = 0
var i: Int = 0
while (i < smallArray.remaining()) {
if (binarySearch(largeArray, smallArray.getLong(i)) >= 0) {
res += 1
}
i += 8
}
res
}
def computeIntersectionWithIds(
smallArray: ByteBuffer,
largeArray: ByteBuffer,
intersectionLimit: Int
): (Int, Seq[Long]) = {
var res: Int = 0
var i: Int = 0
// Most of the intersectionLimit is smaller than default size: 16
val idBuffer = ArrayBuffer[Long]()
while (i < smallArray.remaining()) {
val value = smallArray.getLong(i)
if (binarySearch(largeArray, value) >= 0) {
res += 1
// Always get the smaller ids
if (idBuffer.size < intersectionLimit) {
idBuffer += value
}
}
i += 8
}
(res, idBuffer)
}
/**
* Note that this function assumes the ByteBuffer is encoded using Injections.seqLong2ByteBuffer
*
*/
private[util] def binarySearch(arr: ByteBuffer, value: Long): Int = {
var start = 0
var end = arr.remaining()
while (start <= end && start < arr.remaining()) {
val mid = ((start + end) >> 1) & ~7 // take mid - mid % 8
if (arr.getLong(mid) == value) {
return mid // return the index of the value
} else if (arr.getLong(mid) < value) {
start = mid + 8
} else {
end = mid - 1
}
}
// if not existed, return -1
-1
}
/**
* TODO: for now it only computes intersection size. Will add more feature types (e.g., dot
* product, maximum value).
*
* NOTE that this function assumes both x and y are SORTED arrays.
* In graph feature service, the sorting is done in the offline Scalding job.
*
* @param x source user's array
* @param y candidate user's array
* @param featureType feature type
* @return
*/
def apply(x: Array[Long], y: Array[Long], featureType: FeatureType): IntersectionValue = {
val xSize = x.length
val ySize = y.length
val intersection =
if (xSize.min(ySize) * math.log(xSize.max(ySize)) < (xSize + ySize).toDouble) {
if (xSize < ySize) {
computeIntersectionUsingBinarySearchOnLargerArray(x, y)
} else {
computeIntersectionUsingBinarySearchOnLargerArray(y, x)
}
} else {
computeIntersectionUsingListMerging(x, y)
}
IntersectionValue(
featureType,
Some(intersection.toInt),
None, // return None for now
Some(xSize),
Some(ySize)
)
}
/**
* Function for computing the intersections of two SORTED arrays by list merging.
*
* @param x one array
* @param y another array
* @param ordering ordering function for comparing values of T
* @tparam T type
* @return The intersection size and the list of intersected elements
*/
private[util] def computeIntersectionUsingListMerging[T](
x: Array[T],
y: Array[T]
)(
implicit ordering: Ordering[T]
): Int = {
var res: Int = 0
var i: Int = 0
var j: Int = 0
while (i < x.length && j < y.length) {
val comp = ordering.compare(x(i), y(j))
if (comp > 0) j += 1
else if (comp < 0) i += 1
else {
res += 1
i += 1
j += 1
}
}
res
}
/**
* Function for computing the intersections of two arrays by binary search on the larger array.
* Note that the larger array MUST be SORTED.
*
* @param smallArray smaller array
* @param largeArray larger array
* @param ordering ordering function for comparing values of T
* @tparam T type
*
* @return The intersection size and the list of intersected elements
*/
private[util] def computeIntersectionUsingBinarySearchOnLargerArray[T](
smallArray: Array[T],
largeArray: Array[T]
)(
implicit ordering: Ordering[T]
): Int = {
var res: Int = 0
var i: Int = 0
while (i < smallArray.length) {
val currentValue: T = smallArray(i)
if (binarySearch(largeArray, currentValue) >= 0) {
res += 1
}
i += 1
}
res
}
/**
* Function for doing the binary search
*
* @param arr array
* @param value the target value for searching
* @param ordering ordering function
* @tparam T type
* @return the index of element in the larger array.
* If there is no such element in the array, return -1.
*/
private[util] def binarySearch[T](
arr: Array[T],
value: T
)(
implicit ordering: Ordering[T]
): Int = {
var start = 0
var end = arr.length - 1
while (start <= end) {
val mid = (start + end) >> 1
val comp = ordering.compare(arr(mid), value)
if (comp == 0) {
return mid // return the index of the value
} else if (comp < 0) {
start = mid + 1
} else {
end = mid - 1
}
}
// if not existed, return -1
-1
}
}

View File

@ -1,30 +0,0 @@
scala_library(
sources = ["**/*.scala"],
platform = "java8",
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/com/google/inject:guice",
"3rdparty/jvm/javax/inject:javax.inject",
"3rdparty/jvm/net/codingwell:scala-guice",
"discovery-common/src/main/scala/com/twitter/discovery/common/stats",
"finatra-internal/decider/src/main/scala",
"finatra-internal/gizmoduck/src/main/scala",
"finatra-internal/mtls-thriftmux/src/main/scala",
"finatra/inject/inject-app/src/main/scala",
"finatra/inject/inject-core/src/main/scala",
"finatra/inject/inject-server/src/main/scala",
"finatra/inject/inject-thrift-client/src/main/scala",
"finatra/inject/inject-utils/src/main/scala",
"frigate/frigate-common:constdb_util",
"graph-feature-service/src/main/resources",
"graph-feature-service/src/main/scala/com/twitter/graph_feature_service/common",
"graph-feature-service/src/main/scala/com/twitter/graph_feature_service/util",
"graph-feature-service/src/main/thrift/com/twitter/graph_feature_service:graph_feature_service_thrift-scala",
"hermit/hermit-core/src/main/scala/com/twitter/hermit/store/common",
"servo/request/src/main/scala",
"twitter-server-internal/src/main/scala",
"twitter-server/server/src/main/scala",
"util/util-app/src/main/scala",
"util/util-slf4j-api/src/main/scala",
],
)

View File

@ -1,58 +0,0 @@
package com.twitter.graph_feature_service.worker
import com.google.inject.Module
import com.twitter.finatra.decider.modules.DeciderModule
import com.twitter.finatra.gizmoduck.modules.TimerModule
import com.twitter.finatra.mtls.thriftmux.Mtls
import com.twitter.finatra.thrift.ThriftServer
import com.twitter.finatra.thrift.filters.{
LoggingMDCFilter,
StatsFilter,
ThriftMDCFilter,
TraceIdMDCFilter
}
import com.twitter.finatra.mtls.thriftmux.modules.MtlsThriftWebFormsModule
import com.twitter.finatra.thrift.routing.ThriftRouter
import com.twitter.graph_feature_service.thriftscala
import com.twitter.graph_feature_service.worker.controllers.WorkerController
import com.twitter.graph_feature_service.worker.handlers.WorkerWarmupHandler
import com.twitter.graph_feature_service.worker.modules.{
GraphContainerProviderModule,
WorkerFlagModule
}
import com.twitter.graph_feature_service.worker.util.GraphContainer
import com.twitter.inject.thrift.modules.ThriftClientIdModule
import com.twitter.util.Await
object Main extends WorkerMain
class WorkerMain extends ThriftServer with Mtls {
override val name = "graph_feature_service-worker"
override val modules: Seq[Module] = {
Seq(
WorkerFlagModule,
DeciderModule,
TimerModule,
ThriftClientIdModule,
GraphContainerProviderModule,
new MtlsThriftWebFormsModule[thriftscala.Worker.MethodPerEndpoint](this)
)
}
override def configureThrift(router: ThriftRouter): Unit = {
router
.filter[LoggingMDCFilter]
.filter[TraceIdMDCFilter]
.filter[ThriftMDCFilter]
.filter[StatsFilter]
.add[WorkerController]
}
override protected def warmup(): Unit = {
val graphContainer = injector.instance[GraphContainer]
Await.result(graphContainer.warmup)
handle[WorkerWarmupHandler]()
}
}

View File

@ -1,38 +0,0 @@
package com.twitter.graph_feature_service.worker.controllers
import com.twitter.discovery.common.stats.DiscoveryStatsFilter
import com.twitter.finagle.Service
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finatra.thrift.Controller
import com.twitter.graph_feature_service.thriftscala
import com.twitter.graph_feature_service.thriftscala.Worker.GetIntersection
import com.twitter.graph_feature_service.thriftscala._
import com.twitter.graph_feature_service.worker.handlers._
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
class WorkerController @Inject() (
workerGetIntersectionHandler: WorkerGetIntersectionHandler
)(
implicit statsReceiver: StatsReceiver)
extends Controller(thriftscala.Worker) {
// use DiscoveryStatsFilter to filter out exceptions out of our control
private val getIntersectionService: Service[
WorkerIntersectionRequest,
WorkerIntersectionResponse
] =
new DiscoveryStatsFilter[WorkerIntersectionRequest, WorkerIntersectionResponse](
statsReceiver.scope("srv").scope("get_intersection")
).andThen(Service.mk(workerGetIntersectionHandler))
val getIntersection: Service[GetIntersection.Args, WorkerIntersectionResponse] = { args =>
getIntersectionService(args.request).onFailure { throwable =>
logger.error(s"Failure to get intersection for request $args.", throwable)
}
}
handle(GetIntersection) { getIntersection }
}

View File

@ -1,105 +0,0 @@
package com.twitter.graph_feature_service.worker.handlers
import com.twitter.finagle.stats.{Stat, StatsReceiver}
import com.twitter.graph_feature_service.thriftscala.{
WorkerIntersectionRequest,
WorkerIntersectionResponse,
WorkerIntersectionValue
}
import com.twitter.graph_feature_service.util.{FeatureTypesCalculator, IntersectionValueCalculator}
import com.twitter.graph_feature_service.util.IntersectionValueCalculator._
import com.twitter.graph_feature_service.worker.util.GraphContainer
import com.twitter.servo.request.RequestHandler
import com.twitter.util.Future
import java.nio.ByteBuffer
import javax.inject.{Inject, Singleton}
@Singleton
class WorkerGetIntersectionHandler @Inject() (
graphContainer: GraphContainer,
statsReceiver: StatsReceiver)
extends RequestHandler[WorkerIntersectionRequest, WorkerIntersectionResponse] {
import WorkerGetIntersectionHandler._
private val stats: StatsReceiver = statsReceiver.scope("srv/get_intersection")
private val numCandidatesCount = stats.counter("total_num_candidates")
private val toPartialGraphQueryStat = stats.stat("to_partial_graph_query_latency")
private val fromPartialGraphQueryStat = stats.stat("from_partial_graph_query_latency")
private val intersectionCalculationStat = stats.stat("computation_latency")
override def apply(request: WorkerIntersectionRequest): Future[WorkerIntersectionResponse] = {
numCandidatesCount.incr(request.candidateUserIds.length)
val userId = request.userId
// NOTE: do not change the order of candidates
val candidateIds = request.candidateUserIds
// NOTE: do not change the order of features
val featureTypes =
FeatureTypesCalculator.getFeatureTypes(request.presetFeatureTypes, request.featureTypes)
val leftEdges = featureTypes.map(_.leftEdgeType).distinct
val rightEdges = featureTypes.map(_.rightEdgeType).distinct
val rightEdgeMap = Stat.time(toPartialGraphQueryStat) {
rightEdges.map { rightEdge =>
val map = graphContainer.toPartialMap.get(rightEdge) match {
case Some(graph) =>
candidateIds.flatMap { candidateId =>
graph.apply(candidateId).map(candidateId -> _)
}.toMap
case None =>
Map.empty[Long, ByteBuffer]
}
rightEdge -> map
}.toMap
}
val leftEdgeMap = Stat.time(fromPartialGraphQueryStat) {
leftEdges.flatMap { leftEdge =>
graphContainer.toPartialMap.get(leftEdge).flatMap(_.apply(userId)).map(leftEdge -> _)
}.toMap
}
val res = Stat.time(intersectionCalculationStat) {
WorkerIntersectionResponse(
// NOTE that candidate ordering is important
candidateIds.map { candidateId =>
// NOTE that the featureTypes ordering is important
featureTypes.map {
featureType =>
val leftNeighborsOpt = leftEdgeMap.get(featureType.leftEdgeType)
val rightNeighborsOpt =
rightEdgeMap.get(featureType.rightEdgeType).flatMap(_.get(candidateId))
if (leftNeighborsOpt.isEmpty && rightNeighborsOpt.isEmpty) {
EmptyWorkerIntersectionValue
} else if (rightNeighborsOpt.isEmpty) {
EmptyWorkerIntersectionValue.copy(
leftNodeDegree = computeArraySize(leftNeighborsOpt.get)
)
} else if (leftNeighborsOpt.isEmpty) {
EmptyWorkerIntersectionValue.copy(
rightNodeDegree = computeArraySize(rightNeighborsOpt.get)
)
} else {
IntersectionValueCalculator(
leftNeighborsOpt.get,
rightNeighborsOpt.get,
request.intersectionIdLimit)
}
}
}
)
}
Future.value(res)
}
}
object WorkerGetIntersectionHandler {
val EmptyWorkerIntersectionValue: WorkerIntersectionValue = WorkerIntersectionValue(0, 0, 0, Nil)
}

View File

@ -1,14 +0,0 @@
package com.twitter.graph_feature_service.worker.handlers
import com.twitter.finatra.thrift.routing.ThriftWarmup
import com.twitter.inject.Logging
import com.twitter.inject.utils.Handler
import javax.inject.{Inject, Singleton}
@Singleton
class WorkerWarmupHandler @Inject() (warmup: ThriftWarmup) extends Handler with Logging {
override def handle(): Unit = {
info("Warmup Done!")
}
}

View File

@ -1,62 +0,0 @@
package com.twitter.graph_feature_service.worker.modules
import com.google.inject.Provides
import com.twitter.concurrent.AsyncSemaphore
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.graph_feature_service.common.Configs._
import com.twitter.graph_feature_service.worker.util
import com.twitter.graph_feature_service.worker.util.AutoUpdatingGraph
import com.twitter.graph_feature_service.worker.util.FollowedByPartialValueGraph
import com.twitter.graph_feature_service.worker.util.FollowingPartialValueGraph
import com.twitter.graph_feature_service.worker.util.GraphContainer
import com.twitter.graph_feature_service.worker.util.GraphKey
import com.twitter.graph_feature_service.worker.util.MutualFollowPartialValueGraph
import com.twitter.inject.TwitterModule
import com.twitter.inject.annotations.Flag
import com.twitter.util.Timer
import javax.inject.Singleton
object GraphContainerProviderModule extends TwitterModule {
@Provides
@Singleton
def provideAutoUpdatingGraphs(
@Flag(WorkerFlagNames.HdfsCluster) hdfsCluster: String,
@Flag(WorkerFlagNames.HdfsClusterUrl) hdfsClusterUrl: String,
@Flag(WorkerFlagNames.ShardId) shardId: Int
)(
implicit statsReceiver: StatsReceiver,
timer: Timer
): GraphContainer = {
// NOTE that we do not load some the graphs for saving RAM at this moment.
val enabledGraphPaths: Map[GraphKey, String] =
Map(
FollowingPartialValueGraph -> FollowOutValPath,
FollowedByPartialValueGraph -> FollowInValPath
)
// Only allow one graph to update at the same time.
val sharedSemaphore = new AsyncSemaphore(1)
val graphs: Map[GraphKey, AutoUpdatingGraph] =
enabledGraphPaths.map {
case (graphKey, path) =>
graphKey -> AutoUpdatingGraph(
dataPath = getHdfsPath(path),
hdfsCluster = hdfsCluster,
hdfsClusterUrl = hdfsClusterUrl,
shard = shardId,
minimumSizeForCompleteGraph = 1e6.toLong,
sharedSemaphore = Some(sharedSemaphore)
)(
statsReceiver
.scope("graphs")
.scope(graphKey.getClass.getSimpleName),
timer
)
}
util.GraphContainer(graphs)
}
}

View File

@ -1,33 +0,0 @@
package com.twitter.graph_feature_service.worker.modules
import com.twitter.inject.TwitterModule
object WorkerFlagNames {
final val ServiceRole = "service.role"
final val ServiceEnv = "service.env"
final val ShardId = "service.shardId"
final val NumShards = "service.numShards"
final val HdfsCluster = "service.hdfsCluster"
final val HdfsClusterUrl = "service.hdfsClusterUrl"
}
/**
* Initializes references to the flag values defined in the aurora.deploy file.
* To check what the flag values are initialized in runtime, search FlagsModule in stdout
*/
object WorkerFlagModule extends TwitterModule {
import WorkerFlagNames._
flag[Int](ShardId, "Shard Id")
flag[Int](NumShards, "Num of Graph Shards")
flag[String](ServiceRole, "Service Role")
flag[String](ServiceEnv, "Service Env")
flag[String](HdfsCluster, "Hdfs cluster to download graph files from")
flag[String](HdfsClusterUrl, "Hdfs cluster url to download graph files from")
}

View File

@ -1,69 +0,0 @@
package com.twitter.graph_feature_service.worker.util
import com.twitter.bijection.Injection
import com.twitter.concurrent.AsyncSemaphore
import com.twitter.conversions.DurationOps._
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.frigate.common.constdb_util.{
AutoUpdatingReadOnlyGraph,
ConstDBImporter,
Injections
}
import com.twitter.graph_feature_service.common.Configs
import com.twitter.util.{Duration, Future, Timer}
import java.nio.ByteBuffer
/**
* @param dataPath the path to the data on HDFS
* @param hdfsCluster cluster where we check for updates and download graph files from
* @param hdfsClusterUrl url to HDFS cluster
* @param shard The shard of the graph to download
* @param minimumSizeForCompleteGraph minimumSize for complete graph - otherwise we don't load it
* @param updateIntervalMin The interval after which the first update is tried and the interval between such updates
* @param updateIntervalMax the maximum time before an update is triggered
* @param deleteInterval The interval after which older data is deleted from disk
* @param sharedSemaphore The semaphore controls the number of graph loads at same time on the instance.
*/
case class AutoUpdatingGraph(
dataPath: String,
hdfsCluster: String,
hdfsClusterUrl: String,
shard: Int,
minimumSizeForCompleteGraph: Long,
updateIntervalMin: Duration = 1.hour,
updateIntervalMax: Duration = 12.hours,
deleteInterval: Duration = 2.seconds,
sharedSemaphore: Option[AsyncSemaphore] = None
)(
implicit statsReceiver: StatsReceiver,
timer: Timer)
extends AutoUpdatingReadOnlyGraph[Long, ByteBuffer](
hdfsCluster,
hdfsClusterUrl,
shard,
minimumSizeForCompleteGraph,
updateIntervalMin,
updateIntervalMax,
deleteInterval,
sharedSemaphore
)
with ConstDBImporter[Long, ByteBuffer] {
override def numGraphShards: Int = Configs.NumGraphShards
override def basePath: String = dataPath
override val keyInj: Injection[Long, ByteBuffer] = Injections.long2Varint
override val valueInj: Injection[ByteBuffer, ByteBuffer] = Injection.identity
override def get(targetId: Long): Future[Option[ByteBuffer]] =
super
.get(targetId)
.map { res =>
res.foreach(r => arraySizeStat.add(r.remaining()))
res
}
private val arraySizeStat = stats.scope("get").stat("size")
}

View File

@ -1,14 +0,0 @@
package com.twitter.graph_feature_service.worker.util
import com.twitter.graph_feature_service.thriftscala.EdgeType
sealed trait GfsQuery {
def edgeType: EdgeType
def userId: Long
}
/**
* Search for edges for any users to users in local partition.
*/
case class ToPartialQuery(edgeType: EdgeType, userId: Long) extends GfsQuery

View File

@ -1,19 +0,0 @@
package com.twitter.graph_feature_service.worker.util
import com.twitter.graph_feature_service.thriftscala.EdgeType
import com.twitter.util.Future
case class GraphContainer(
graphs: Map[GraphKey, AutoUpdatingGraph]) {
final val toPartialMap: Map[EdgeType, AutoUpdatingGraph] =
graphs.collect {
case (partialValueGraph: PartialValueGraph, graph) =>
partialValueGraph.edgeType -> graph
}
// load all the graphs from constantDB format to memory
def warmup: Future[Unit] = {
Future.collect(graphs.mapValues(_.warmup())).unit
}
}

View File

@ -1,32 +0,0 @@
package com.twitter.graph_feature_service.worker.util
import com.twitter.graph_feature_service.thriftscala.EdgeType
import com.twitter.graph_feature_service.thriftscala.EdgeType._
sealed trait GraphKey {
def edgeType: EdgeType
}
sealed trait PartialValueGraph extends GraphKey
/**
* Follow Graphs
*/
object FollowingPartialValueGraph extends PartialValueGraph {
override def edgeType: EdgeType = Following
}
object FollowedByPartialValueGraph extends PartialValueGraph {
override def edgeType: EdgeType = FollowedBy
}
/**
* Mutual Follow Graphs
*/
object MutualFollowPartialValueGraph extends PartialValueGraph {
override def edgeType: EdgeType = MutualFollow
}

View File

@ -1,16 +0,0 @@
package com.twitter.graph_feature_service.worker.util
//These classes are to help the GraphContainer choose the right data structure to answer queries
sealed trait GraphType
object FollowGraph extends GraphType
object FavoriteGraph extends GraphType
object RetweetGraph extends GraphType
object ReplyGraph extends GraphType
object MentionGraph extends GraphType
object MutualFollowGraph extends GraphType

View File

@ -1,66 +0,0 @@
scala_library(
platform = "java8",
tags = [
"bazel-compatible",
"bazel-only",
],
dependencies = [
"3rdparty/jvm/com/twitter/bijection:core",
"frigate/frigate-common/src/main/scala/com/twitter/frigate/common/constdb_util",
"graph-feature-service/src/main/scala/com/twitter/graph_feature_service/common",
"src/scala/com/twitter/interaction_graph/scio/agg_all:interaction_graph_history_aggregated_edge_snapshot-scala",
"src/scala/com/twitter/interaction_graph/scio/ml/scores:real_graph_in_scores-scala",
"src/scala/com/twitter/pluck/source/user_audits:user_audit_final-scala",
"src/scala/com/twitter/scalding_internal/dalv2",
"src/scala/com/twitter/scalding_internal/job",
"src/scala/com/twitter/scalding_internal/job/analytics_batch",
],
)
scalding_job(
name = "graph_feature_service_adhoc_job",
main = "com.twitter.graph_feature_service.scalding.GraphFeatureServiceAdhocApp",
args = [
"--date 2022-10-24",
],
config = [
("hadoop.map.jvm.total-memory", "3072m"),
("hadoop.reduce.jvm.total-memory", "3072m"),
("hadoop.submitter.jvm.total-memory", "5120m"),
("submitter.tier", "preemptible"),
],
contact = "recos-platform-alerts@twitter.com",
hadoop_cluster = "atla-proc",
hadoop_properties = [("mapreduce.job.hdfs-servers", "/atla/proc/user/cassowary")],
platform = "java8",
role = "cassowary",
runtime_platform = "java8",
tags = [
"bazel-compatible:migrated",
"bazel-only",
],
dependencies = [":scalding"],
)
scalding_job(
name = "graph_feature_service_daily_job",
main = "com.twitter.graph_feature_service.scalding.GraphFeatureServiceScheduledApp",
config = [
("hadoop.map.jvm.total-memory", "3072m"),
("hadoop.reduce.jvm.total-memory", "3072m"),
("hadoop.submitter.jvm.total-memory", "5120m"),
("submitter.tier", "preemptible"),
],
contact = "recos-platform-alerts@twitter.com",
cron = "01,31 * * * *",
hadoop_cluster = "atla-proc",
hadoop_properties = [("mapreduce.job.hdfs-servers", "/atla/proc/user/cassowary")],
platform = "java8",
role = "cassowary",
runtime_platform = "java8",
tags = [
"bazel-compatible:migrated",
"bazel-only",
],
dependencies = [":scalding"],
)

View File

@ -1,9 +0,0 @@
package com.twitter.graph_feature_service.scalding
case class EdgeFeature(
realGraphScore: Float,
followScore: Option[Float] = None,
mutualFollowScore: Option[Float] = None,
favoriteScore: Option[Float] = None,
retweetScore: Option[Float] = None,
mentionScore: Option[Float] = None)

View File

@ -1,85 +0,0 @@
package com.twitter.graph_feature_service.scalding
import com.twitter.scalding._
import com.twitter.scalding_internal.job.TwitterExecutionApp
import com.twitter.scalding_internal.job.analytics_batch.{
AnalyticsBatchExecution,
AnalyticsBatchExecutionArgs,
BatchDescription,
BatchFirstTime,
BatchIncrement,
TwitterScheduledExecutionApp
}
import java.util.TimeZone
/**
* Each job only needs to implement this runOnDateRange() function. It makes it easier for testing.
*/
trait GraphFeatureServiceBaseJob {
implicit val timeZone: TimeZone = DateOps.UTC
implicit val dateParser: DateParser = DateParser.default
def runOnDateRange(
enableValueGraphs: Option[Boolean] = None,
enableKeyGraphs: Option[Boolean] = None
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit]
/**
* Print customized counters in the log
*/
def printerCounters[T](execution: Execution[T]): Execution[Unit] = {
execution.getCounters
.flatMap {
case (_, counters) =>
counters.toMap.toSeq
.sortBy(e => (e._1.group, e._1.counter))
.foreach {
case (statKey, value) =>
println(s"${statKey.group}\t${statKey.counter}\t$value")
}
Execution.unit
}
}
}
/**
* Trait that wraps things about adhoc jobs.
*/
trait GraphFeatureServiceAdhocBaseApp extends TwitterExecutionApp with GraphFeatureServiceBaseJob {
override def job: Execution[Unit] = Execution.withId { implicit uniqueId =>
Execution.getArgs.flatMap { args: Args =>
implicit val dateRange: DateRange = DateRange.parse(args.list("date"))(timeZone, dateParser)
printerCounters(runOnDateRange())
}
}
}
/**
* Trait that wraps things about scheduled jobs.
*
* A new daily app only needs to declare the starting date.
*/
trait GraphFeatureServiceScheduledBaseApp
extends TwitterScheduledExecutionApp
with GraphFeatureServiceBaseJob {
def firstTime: RichDate // for example: RichDate("2018-02-21")
def batchIncrement: Duration = Days(1)
override def scheduledJob: Execution[Unit] = Execution.withId { implicit uniqueId =>
val analyticsArgs = AnalyticsBatchExecutionArgs(
batchDesc = BatchDescription(getClass.getName),
firstTime = BatchFirstTime(firstTime),
batchIncrement = BatchIncrement(batchIncrement)
)
AnalyticsBatchExecution(analyticsArgs) { implicit dateRange =>
printerCounters(runOnDateRange())
}
}
}

View File

@ -1,52 +0,0 @@
package com.twitter.graph_feature_service.scalding
import com.twitter.scalding.DateRange
import com.twitter.scalding.Execution
import com.twitter.scalding.RichDate
import com.twitter.scalding.UniqueID
import java.util.Calendar
import java.util.TimeZone
import sun.util.calendar.BaseCalendar
/**
* To launch an adhoc run:
*
scalding remote run --target graph-feature-service/src/main/scalding/com/twitter/graph_feature_service/scalding:graph_feature_service_adhoc_job
*/
object GraphFeatureServiceAdhocApp
extends GraphFeatureServiceMainJob
with GraphFeatureServiceAdhocBaseApp {}
/**
* To schedule the job, upload the workflows config (only required for the first time and subsequent config changes):
* scalding workflow upload --jobs graph-feature-service/src/main/scalding/com/twitter/graph_feature_service/scalding:graph_feature_service_daily_job --autoplay --build-cron-schedule "20 23 1 * *"
* You can then build from the UI by clicking "Build" and pasting in your remote branch, or leave it empty if you're redeploying from master.
* The workflows config above should automatically trigger once each month.
*/
object GraphFeatureServiceScheduledApp
extends GraphFeatureServiceMainJob
with GraphFeatureServiceScheduledBaseApp {
override def firstTime: RichDate = RichDate("2018-05-18")
override def runOnDateRange(
enableValueGraphs: Option[Boolean],
enableKeyGraphs: Option[Boolean]
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
// Only run the value Graphs on Tuesday, Thursday, Saturday
val overrideEnableValueGraphs = {
val dayOfWeek = dateRange.start.toCalendar.get(Calendar.DAY_OF_WEEK)
dayOfWeek == BaseCalendar.TUESDAY |
dayOfWeek == BaseCalendar.THURSDAY |
dayOfWeek == BaseCalendar.SATURDAY
}
super.runOnDateRange(
Some(true),
Some(false) // disable key Graphs since we are not using them in production
)
}
}

View File

@ -1,297 +0,0 @@
package com.twitter.graph_feature_service.scalding
import com.twitter.bijection.Injection
import com.twitter.frigate.common.constdb_util.Injections
import com.twitter.frigate.common.constdb_util.ScaldingUtil
import com.twitter.graph_feature_service.common.Configs
import com.twitter.graph_feature_service.common.Configs._
import com.twitter.interaction_graph.scio.agg_all.InteractionGraphHistoryAggregatedEdgeSnapshotScalaDataset
import com.twitter.interaction_graph.scio.ml.scores.RealGraphInScoresScalaDataset
import com.twitter.interaction_graph.thriftscala.FeatureName
import com.twitter.interaction_graph.thriftscala.{EdgeFeature => TEdgeFeature}
import com.twitter.pluck.source.user_audits.UserAuditFinalScalaDataset
import com.twitter.scalding.DateRange
import com.twitter.scalding.Days
import com.twitter.scalding.Execution
import com.twitter.scalding.Stat
import com.twitter.scalding.UniqueID
import com.twitter.scalding.typed.TypedPipe
import com.twitter.scalding_internal.dalv2.DAL
import com.twitter.scalding_internal.dalv2.remote_access.AllowCrossClusterSameDC
import com.twitter.scalding_internal.multiformat.format.keyval.KeyVal
import com.twitter.util.Time
import com.twitter.wtf.candidate.thriftscala.CandidateSeq
import java.nio.ByteBuffer
import java.util.TimeZone
trait GraphFeatureServiceMainJob extends GraphFeatureServiceBaseJob {
// keeping hdfsPath as a separate variable in order to override it in unit tests
protected val hdfsPath: String = BaseHdfsPath
protected def getShardIdForUser(userId: Long): Int = shardForUser(userId)
protected implicit val keyInj: Injection[Long, ByteBuffer] = Injections.long2Varint
protected implicit val valueInj: Injection[Long, ByteBuffer] = Injections.long2ByteBuffer
protected val bufferSize: Int = 1 << 26
protected val maxNumKeys: Int = 1 << 24
protected val numReducers: Int = NumGraphShards
protected val outputStreamBufferSize: Int = 1 << 26
protected final val shardingByKey = { (k: Long, _: Long) =>
getShardIdForUser(k)
}
protected final val shardingByValue = { (_: Long, v: Long) =>
getShardIdForUser(v)
}
private def writeGraphToDB(
graph: TypedPipe[(Long, Long)],
shardingFunction: (Long, Long) => Int,
path: String
)(
implicit dateRange: DateRange
): Execution[TypedPipe[(Int, Unit)]] = {
ScaldingUtil
.writeConstDB[Long, Long](
graph.withDescription(s"sharding $path"),
shardingFunction,
shardId =>
getTimedHdfsShardPath(
shardId,
getHdfsPath(path, Some(hdfsPath)),
Time.fromMilliseconds(dateRange.end.timestamp)
),
Int.MaxValue,
bufferSize,
maxNumKeys,
numReducers,
outputStreamBufferSize
)(
keyInj,
valueInj,
Ordering[(Long, Long)]
)
.forceToDiskExecution
}
def extractFeature(
featureList: Seq[TEdgeFeature],
featureName: FeatureName
): Option[Float] = {
featureList
.find(_.name == featureName)
.map(_.tss.ewma.toFloat)
.filter(_ > 0.0)
}
/**
* Function to extract a subgraph (e.g., follow graph) from real graph and take top K by real graph
* weight.
*
* @param input input real graph
* @param edgeFilter filter function to only get the edges needed (e.g., only follow edges)
* @param counter counter
* @return a subgroup that contains topK, e.g., follow graph for each user.
*/
private def getSubGraph(
input: TypedPipe[(Long, Long, EdgeFeature)],
edgeFilter: EdgeFeature => Boolean,
counter: Stat
): TypedPipe[(Long, Long)] = {
input
.filter(c => edgeFilter(c._3))
.map {
case (srcId, destId, features) =>
(srcId, (destId, features.realGraphScore))
}
.group
// auto reducer estimation only allocates 15 reducers, so setting an explicit number here
.withReducers(2000)
.sortedReverseTake(TopKRealGraph)(Ordering.by(_._2))
.flatMap {
case (srcId, topKNeighbors) =>
counter.inc()
topKNeighbors.map {
case (destId, _) =>
(srcId, destId)
}
}
}
def getMauIds()(implicit dateRange: DateRange, uniqueID: UniqueID): TypedPipe[Long] = {
val numMAUs = Stat("NUM_MAUS")
val uniqueMAUs = Stat("UNIQUE_MAUS")
DAL
.read(UserAuditFinalScalaDataset)
.withRemoteReadPolicy(AllowCrossClusterSameDC)
.toTypedPipe
.collect {
case user_audit if user_audit.isValid =>
numMAUs.inc()
user_audit.userId
}
.distinct
.map { u =>
uniqueMAUs.inc()
u
}
}
def getRealGraphWithMAUOnly(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): TypedPipe[(Long, Long, EdgeFeature)] = {
val numMAUs = Stat("NUM_MAUS")
val uniqueMAUs = Stat("UNIQUE_MAUS")
val monthlyActiveUsers = DAL
.read(UserAuditFinalScalaDataset)
.withRemoteReadPolicy(AllowCrossClusterSameDC)
.toTypedPipe
.collect {
case user_audit if user_audit.isValid =>
numMAUs.inc()
user_audit.userId
}
.distinct
.map { u =>
uniqueMAUs.inc()
u
}
.asKeys
val realGraphAggregates = DAL
.readMostRecentSnapshot(
InteractionGraphHistoryAggregatedEdgeSnapshotScalaDataset,
dateRange.embiggen(Days(5)))
.withRemoteReadPolicy(AllowCrossClusterSameDC)
.toTypedPipe
.map { edge =>
val featureList = edge.features
val edgeFeature = EdgeFeature(
edge.weight.getOrElse(0.0).toFloat,
extractFeature(featureList, FeatureName.NumMutualFollows),
extractFeature(featureList, FeatureName.NumFavorites),
extractFeature(featureList, FeatureName.NumRetweets),
extractFeature(featureList, FeatureName.NumMentions)
)
(edge.sourceId, (edge.destinationId, edgeFeature))
}
.join(monthlyActiveUsers)
.map {
case (srcId, ((destId, feature), _)) =>
(destId, (srcId, feature))
}
.join(monthlyActiveUsers)
.map {
case (destId, ((srcId, feature), _)) =>
(srcId, destId, feature)
}
realGraphAggregates
}
def getTopKFollowGraph(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): TypedPipe[(Long, Long)] = {
val followGraphMauStat = Stat("NumFollowEdges_MAU")
val mau: TypedPipe[Long] = getMauIds()
DAL
.readMostRecentSnapshot(RealGraphInScoresScalaDataset, dateRange.embiggen(Days(7)))
.withRemoteReadPolicy(AllowCrossClusterSameDC)
.toTypedPipe
.groupBy(_.key)
.join(mau.asKeys)
.withDescription("filtering srcId by mau")
.flatMap {
case (_, (KeyVal(srcId, CandidateSeq(candidates)), _)) =>
followGraphMauStat.inc()
val topK = candidates.sortBy(-_.score).take(TopKRealGraph)
topK.map { c => (srcId, c.userId) }
}
}
override def runOnDateRange(
enableValueGraphs: Option[Boolean],
enableKeyGraphs: Option[Boolean]
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
val processValueGraphs = enableValueGraphs.getOrElse(Configs.EnableValueGraphs)
val processKeyGraphs = enableKeyGraphs.getOrElse(Configs.EnableKeyGraphs)
if (!processKeyGraphs && !processValueGraphs) {
// Skip the batch job
Execution.unit
} else {
// val favoriteGraphStat = Stat("NumFavoriteEdges")
// val retweetGraphStat = Stat("NumRetweetEdges")
// val mentionGraphStat = Stat("NumMentionEdges")
// val realGraphAggregates = getRealGraphWithMAUOnly
val followGraph = getTopKFollowGraph
// val mutualFollowGraph = followGraph.asKeys.join(followGraph.swap.asKeys).keys
// val favoriteGraph =
// getSubGraph(realGraphAggregates, _.favoriteScore.isDefined, favoriteGraphStat)
// val retweetGraph =
// getSubGraph(realGraphAggregates, _.retweetScore.isDefined, retweetGraphStat)
// val mentionGraph =
// getSubGraph(realGraphAggregates, _.mentionScore.isDefined, mentionGraphStat)
val writeValDataSetExecutions = if (processValueGraphs) {
Seq(
(followGraph, shardingByValue, FollowOutValPath),
(followGraph.swap, shardingByValue, FollowInValPath)
// (mutualFollowGraph, shardingByValue, MutualFollowValPath),
// (favoriteGraph, shardingByValue, FavoriteOutValPath),
// (favoriteGraph.swap, shardingByValue, FavoriteInValPath),
// (retweetGraph, shardingByValue, RetweetOutValPath),
// (retweetGraph.swap, shardingByValue, RetweetInValPath),
// (mentionGraph, shardingByValue, MentionOutValPath),
// (mentionGraph.swap, shardingByValue, MentionInValPath)
)
} else {
Seq.empty
}
val writeKeyDataSetExecutions = if (processKeyGraphs) {
Seq(
(followGraph, shardingByKey, FollowOutKeyPath),
(followGraph.swap, shardingByKey, FollowInKeyPath)
// (favoriteGraph, shardingByKey, FavoriteOutKeyPath),
// (favoriteGraph.swap, shardingByKey, FavoriteInKeyPath),
// (retweetGraph, shardingByKey, RetweetOutKeyPath),
// (retweetGraph.swap, shardingByKey, RetweetInKeyPath),
// (mentionGraph, shardingByKey, MentionOutKeyPath),
// (mentionGraph.swap, shardingByKey, MentionInKeyPath),
// (mutualFollowGraph, shardingByKey, MutualFollowKeyPath)
)
} else {
Seq.empty
}
Execution
.sequence((writeValDataSetExecutions ++ writeKeyDataSetExecutions).map {
case (graph, shardingMethod, path) =>
writeGraphToDB(graph, shardingMethod, path)
}).unit
}
}
}

View File

@ -1,27 +0,0 @@
scala_library(
platform = "java8",
tags = ["bazel-only"],
dependencies = [
"3rdparty/jvm/com/twitter/bijection:core",
"3rdparty/jvm/com/twitter/bijection:scrooge",
"frigate/frigate-common/src/main/scala/com/twitter/frigate/common/constdb_util",
"src/java/com/twitter/ml/api:api-base",
"src/scala/com/twitter/ml/api:api-base",
"src/scala/com/twitter/scalding_internal/job",
"src/scala/com/twitter/scalding_internal/job/analytics_batch",
"src/thrift/com/twitter/ml/api:data-java",
],
)
hadoop_binary(
name = "gfs_random_request-adhoc",
main = "com.twitter.graph_feature_service.scalding.adhoc.RandomRequestGenerationApp",
platform = "java8",
runtime_platform = "java8",
tags = [
"bazel-compatible",
"bazel-compatible:migrated",
"bazel-only",
],
dependencies = [":adhoc"],
)

View File

@ -1,77 +0,0 @@
package com.twitter.graph_feature_service.scalding.adhoc
import com.twitter.bijection.Injection
import com.twitter.frigate.common.constdb_util.Injections
import com.twitter.ml.api.Feature.Discrete
import com.twitter.ml.api.{DailySuffixFeatureSource, DataSetPipe, RichDataRecord}
import com.twitter.scalding._
import com.twitter.scalding_internal.job.TwitterExecutionApp
import java.nio.ByteBuffer
import java.util.TimeZone
object RandomRequestGenerationJob {
implicit val timeZone: TimeZone = DateOps.UTC
implicit val dateParser: DateParser = DateParser.default
val timelineRecapDataSetPath: String =
"/atla/proc2/user/timelines/processed/suggests/recap/data_records"
val USER_ID = new Discrete("meta.user_id")
val AUTHOR_ID = new Discrete("meta.author_id")
val timelineRecapOutPutPath: String = "/user/cassowary/gfs/adhoc/timeline_data"
implicit val inj: Injection[Long, ByteBuffer] = Injections.long2Varint
def run(
dataSetPath: String,
outPutPath: String,
numOfPairsToTake: Int
)(
implicit dateRange: DateRange,
uniqueID: UniqueID
): Execution[Unit] = {
val NumUserAuthorPairs = Stat("NumUserAuthorPairs")
val dataSet: DataSetPipe = DailySuffixFeatureSource(dataSetPath).read
val userAuthorPairs: TypedPipe[(Long, Long)] = dataSet.records.map { record =>
val richRecord = new RichDataRecord(record, dataSet.featureContext)
val userId = richRecord.getFeatureValue(USER_ID)
val authorId = richRecord.getFeatureValue(AUTHOR_ID)
NumUserAuthorPairs.inc()
(userId, authorId)
}
userAuthorPairs
.limit(numOfPairsToTake)
.writeExecution(
TypedTsv[(Long, Long)](outPutPath)
)
}
}
/**
* ./bazel bundle graph-feature-service/src/main/scalding/com/twitter/graph_feature_service/scalding/adhoc:all
*
* oscar hdfs --screen --user cassowary --tee gfs_log --bundle gfs_random_request-adhoc \
--tool com.twitter.graph_feature_service.scalding.adhoc.RandomRequestGenerationApp \
-- --date 2018-08-11 \
--input /atla/proc2/user/timelines/processed/suggests/recap/data_records \
--output /user/cassowary/gfs/adhoc/timeline_data
*/
object RandomRequestGenerationApp extends TwitterExecutionApp {
import RandomRequestGenerationJob._
override def job: Execution[Unit] = Execution.withId { implicit uniqueId =>
Execution.getArgs.flatMap { args: Args =>
implicit val dateRange: DateRange = DateRange.parse(args.list("date"))(timeZone, dateParser)
run(
args.optional("input").getOrElse(timelineRecapDataSetPath),
args.optional("output").getOrElse(timelineRecapOutPutPath),
args.int("num_pairs", 3000)
)
}
}
}

View File

@ -1,15 +0,0 @@
create_thrift_libraries(
base_name = "graph_feature_service_thrift",
sources = ["*.thrift"],
platform = "java8",
tags = ["bazel-compatible"],
generate_languages = [
"java",
# ruby is added due to ruby dependees in timelines
"ruby",
"scala",
"strato",
],
provides_java_name = "graph_feature_service_thrift_java",
provides_scala_name = "graph_feature_service_thrift_scala",
)

View File

@ -1,123 +0,0 @@
namespace java com.twitter.graph_feature_service.thriftjava
#@namespace scala com.twitter.graph_feature_service.thriftscala
#@namespace strato com.twitter.graph_feature_service.thriftscala
// edge type to differentiate different types of graphs (we can also add a lot of other types of edges)
enum EdgeType {
FOLLOWING,
FOLLOWED_BY,
FAVORITE,
FAVORITED_BY,
RETWEET,
RETWEETED_BY,
REPLY,
REPLYED_BY,
MENTION,
MENTIONED_BY,
MUTUAL_FOLLOW,
SIMILAR_TO, // more edge types (like block, report, etc.) can be supported later.
RESERVED_12,
RESERVED_13,
RESERVED_14,
RESERVED_15,
RESERVED_16,
RESERVED_17,
RESERVED_18,
RESERVED_19,
RESERVED_20
}
enum PresetFeatureTypes {
EMPTY,
HTL_TWO_HOP,
WTF_TWO_HOP,
SQ_TWO_HOP,
RUX_TWO_HOP,
MR_TWO_HOP,
USER_TYPEAHEAD_TWO_HOP
}
struct UserWithCount {
1: required i64 userId(personalDataType = 'UserId')
2: required i32 count
}(hasPersonalData = 'true')
struct UserWithScore {
1: required i64 userId(personalDataType = 'UserId')
2: required double score
}(hasPersonalData = 'true')
// Feature Type
// For example, to compute how many of source user's following's have favorited candidate user,
// we need to compute the intersection between source user's FOLLOWING edges, and candidate user's
// FAVORITED_BY edge. In this case, we should user FeatureType(FOLLOWING, FAVORITED_BY)
struct FeatureType {
1: required EdgeType leftEdgeType // edge type from source user
2: required EdgeType rightEdgeType // edge type from candidate user
}(persisted="true")
struct IntersectionValue {
1: required FeatureType featureType
2: optional i32 count
3: optional list<i64> intersectionIds(personalDataType = 'UserId')
4: optional i32 leftNodeDegree
5: optional i32 rightNodeDegree
}(persisted="true", hasPersonalData = 'true')
struct GfsIntersectionResult {
1: required i64 candidateUserId(personalDataType = 'UserId')
2: required list<IntersectionValue> intersectionValues
}(hasPersonalData = 'true')
struct GfsIntersectionRequest {
1: required i64 userId(personalDataType = 'UserId')
2: required list<i64> candidateUserIds(personalDataType = 'UserId')
3: required list<FeatureType> featureTypes
4: optional i32 intersectionIdLimit
}
struct GfsPresetIntersectionRequest {
1: required i64 userId(personalDataType = 'UserId')
2: required list<i64> candidateUserIds(personalDataType = 'UserId')
3: required PresetFeatureTypes presetFeatureTypes
4: optional i32 intersectionIdLimit
}(hasPersonalData = 'true')
struct GfsIntersectionResponse {
1: required list<GfsIntersectionResult> results
}
service Server {
GfsIntersectionResponse getIntersection(1: GfsIntersectionRequest request)
GfsIntersectionResponse getPresetIntersection(1: GfsPresetIntersectionRequest request)
}
###################################################################################################
## For internal usage only
###################################################################################################
struct WorkerIntersectionRequest {
1: required i64 userId(personalDataType = 'UserId')
2: required list<i64> candidateUserIds(personalDataType = 'UserId')
3: required list<FeatureType> featureTypes
4: required PresetFeatureTypes presetFeatureTypes
5: required i32 intersectionIdLimit
}(hasPersonalData = 'true')
struct WorkerIntersectionResponse {
1: required list<list<WorkerIntersectionValue>> results
}
struct WorkerIntersectionValue {
1: i32 count
2: i32 leftNodeDegree
3: i32 rightNodeDegree
4: list<i64> intersectionIds(personalDataType = 'UserId')
}(hasPersonalData = 'true')
struct CachedIntersectionResult {
1: required list<WorkerIntersectionValue> values
}
service Worker {
WorkerIntersectionResponse getIntersection(1: WorkerIntersectionRequest request)
}