[docx] split commit for file 1400

Signed-off-by: Ari Archer <ari.web.xyz@gmail.com>
This commit is contained in:
Ari Archer 2024-01-23 19:07:18 +02:00
parent c80f53f99d
commit e27b2e31c3
No known key found for this signature in database
GPG Key ID: A50D5B4B599AF8A2
400 changed files with 0 additions and 12636 deletions

View File

@ -1,31 +0,0 @@
package com.twitter.graph_feature_service.server.modules
import com.twitter.inject.TwitterModule
object ServerFlagNames {
final val NumWorkers = "service.num_workers"
final val ServiceRole = "service.role"
final val ServiceEnv = "service.env"
final val MemCacheClientName = "service.mem_cache_client_name"
final val MemCachePath = "service.mem_cache_path"
}
/**
* Initializes references to the flag values defined in the aurora.deploy file.
* To check what the flag values are initialized in runtime, search FlagsModule in stdout
*/
object ServerFlagsModule extends TwitterModule {
import ServerFlagNames._
flag[Int](NumWorkers, "Num of workers")
flag[String](ServiceRole, "Service Role")
flag[String](ServiceEnv, "Service Env")
flag[String](MemCacheClientName, "MemCache Client Name")
flag[String](MemCachePath, "MemCache Path")
}

View File

@ -1,16 +0,0 @@
package com.twitter.graph_feature_service.server.stores
import com.twitter.graph_feature_service.common.Configs.RandomSeed
import com.twitter.graph_feature_service.thriftscala.FeatureType
import scala.util.hashing.MurmurHash3
object FeatureTypesEncoder {
def apply(featureTypes: Seq[FeatureType]): String = {
val byteArray = featureTypes.flatMap { featureType =>
Array(featureType.leftEdgeType.getValue.toByte, featureType.rightEdgeType.getValue.toByte)
}.toArray
(MurmurHash3.bytesHash(byteArray, RandomSeed) & 0x7fffffff).toString // keep positive
}
}

View File

@ -1,181 +0,0 @@
package com.twitter.graph_feature_service.server.stores
import com.twitter.finagle.RequestTimeoutException
import com.twitter.finagle.stats.{Stat, StatsReceiver}
import com.twitter.graph_feature_service.server.handlers.ServerGetIntersectionHandler.GetIntersectionRequest
import com.twitter.graph_feature_service.server.modules.GraphFeatureServiceWorkerClients
import com.twitter.graph_feature_service.server.stores.GetIntersectionStore.GetIntersectionQuery
import com.twitter.graph_feature_service.thriftscala._
import com.twitter.inject.Logging
import com.twitter.storehaus.ReadableStore
import com.twitter.util.Future
import javax.inject.Singleton
import scala.collection.mutable.ArrayBuffer
@Singleton
case class GetIntersectionStore(
graphFeatureServiceWorkerClients: GraphFeatureServiceWorkerClients,
statsReceiver: StatsReceiver)
extends ReadableStore[GetIntersectionQuery, CachedIntersectionResult]
with Logging {
import GetIntersectionStore._
private val stats = statsReceiver.scope("get_intersection_store")
private val requestCount = stats.counter(name = "request_count")
private val aggregatorLatency = stats.stat("aggregator_latency")
private val timeOutCounter = stats.counter("worker_timeouts")
private val unknownErrorCounter = stats.counter("unknown_errors")
override def multiGet[K1 <: GetIntersectionQuery](
ks: Set[K1]
): Map[K1, Future[Option[CachedIntersectionResult]]] = {
if (ks.isEmpty) {
Map.empty
} else {
requestCount.incr()
val head = ks.head
// We assume all the GetIntersectionQuery use the same userId and featureTypes
val userId = head.userId
val featureTypes = head.featureTypes
val presetFeatureTypes = head.presetFeatureTypes
val calculatedFeatureTypes = head.calculatedFeatureTypes
val intersectionIdLimit = head.intersectionIdLimit
val request = WorkerIntersectionRequest(
userId,
ks.map(_.candidateId).toArray,
featureTypes,
presetFeatureTypes,
intersectionIdLimit
)
val resultFuture = Future
.collect(
graphFeatureServiceWorkerClients.workers.map { worker =>
worker
.getIntersection(request)
.rescue {
case _: RequestTimeoutException =>
timeOutCounter.incr()
Future.value(DefaultWorkerIntersectionResponse)
case e =>
unknownErrorCounter.incr()
logger.error("Failure to load result.", e)
Future.value(DefaultWorkerIntersectionResponse)
}
}
).map { responses =>
Stat.time(aggregatorLatency) {
gfsIntersectionResponseAggregator(
responses,
calculatedFeatureTypes,
request.candidateUserIds,
intersectionIdLimit
)
}
}
ks.map { query =>
query -> resultFuture.map(_.get(query.candidateId))
}.toMap
}
}
/**
* Function to merge GfsIntersectionResponse from workers into one result.
*/
private def gfsIntersectionResponseAggregator(
responseList: Seq[WorkerIntersectionResponse],
features: Seq[FeatureType],
candidates: Seq[Long],
intersectionIdLimit: Int
): Map[Long, CachedIntersectionResult] = {
// Map of (candidate -> features -> type -> value)
val cube = Array.fill[Int](candidates.length, features.length, 3)(0)
// Map of (candidate -> features -> intersectionIds)
val ids = Array.fill[Option[ArrayBuffer[Long]]](candidates.length, features.length)(None)
val notZero = intersectionIdLimit != 0
for {
response <- responseList
(features, candidateIndex) <- response.results.zipWithIndex
(workerValue, featureIndex) <- features.zipWithIndex
} {
cube(candidateIndex)(featureIndex)(CountIndex) += workerValue.count
cube(candidateIndex)(featureIndex)(LeftDegreeIndex) += workerValue.leftNodeDegree
cube(candidateIndex)(featureIndex)(RightDegreeIndex) += workerValue.rightNodeDegree
if (notZero && workerValue.intersectionIds.nonEmpty) {
val arrayBuffer = ids(candidateIndex)(featureIndex) match {
case Some(buffer) => buffer
case None =>
val buffer = ArrayBuffer[Long]()
ids(candidateIndex)(featureIndex) = Some(buffer)
buffer
}
val intersectionIds = workerValue.intersectionIds
// Scan the intersectionId based on the Shard. The response order is consistent.
if (arrayBuffer.size < intersectionIdLimit) {
if (intersectionIds.size > intersectionIdLimit - arrayBuffer.size) {
arrayBuffer ++= intersectionIds.slice(0, intersectionIdLimit - arrayBuffer.size)
} else {
arrayBuffer ++= intersectionIds
}
}
}
}
candidates.zipWithIndex.map {
case (candidate, candidateIndex) =>
candidate -> CachedIntersectionResult(features.indices.map { featureIndex =>
WorkerIntersectionValue(
cube(candidateIndex)(featureIndex)(CountIndex),
cube(candidateIndex)(featureIndex)(LeftDegreeIndex),
cube(candidateIndex)(featureIndex)(RightDegreeIndex),
ids(candidateIndex)(featureIndex).getOrElse(Nil)
)
})
}.toMap
}
}
object GetIntersectionStore {
private[graph_feature_service] case class GetIntersectionQuery(
userId: Long,
candidateId: Long,
featureTypes: Seq[FeatureType],
presetFeatureTypes: PresetFeatureTypes,
featureTypesString: String,
calculatedFeatureTypes: Seq[FeatureType],
intersectionIdLimit: Int)
private[graph_feature_service] object GetIntersectionQuery {
def buildQueries(request: GetIntersectionRequest): Set[GetIntersectionQuery] = {
request.candidateUserIds.toSet.map { candidateId: Long =>
GetIntersectionQuery(
request.userId,
candidateId,
request.featureTypes,
request.presetFeatureTypes,
request.calculatedFeatureTypesString,
request.calculatedFeatureTypes,
request.intersectionIdLimit.getOrElse(DefaultIntersectionIdLimit)
)
}
}
}
// Don't return the intersectionId for better performance
private val DefaultIntersectionIdLimit = 0
private val DefaultWorkerIntersectionResponse = WorkerIntersectionResponse()
private val CountIndex = 0
private val LeftDegreeIndex = 1
private val RightDegreeIndex = 2
}

View File

@ -1,7 +0,0 @@
scala_library(
platform = "java8",
tags = ["bazel-compatible"],
dependencies = [
"graph-feature-service/src/main/thrift/com/twitter/graph_feature_service:graph_feature_service_thrift-scala",
],
)

View File

@ -1,58 +0,0 @@
package com.twitter.graph_feature_service.util
import com.twitter.graph_feature_service.thriftscala.EdgeType._
import com.twitter.graph_feature_service.thriftscala.{FeatureType, PresetFeatureTypes}
object FeatureTypesCalculator {
final val DefaultTwoHop = Seq(
FeatureType(Following, FollowedBy),
FeatureType(Following, FavoritedBy),
FeatureType(Following, RetweetedBy),
FeatureType(Following, MentionedBy),
FeatureType(Following, MutualFollow),
FeatureType(Favorite, FollowedBy),
FeatureType(Favorite, FavoritedBy),
FeatureType(Favorite, RetweetedBy),
FeatureType(Favorite, MentionedBy),
FeatureType(Favorite, MutualFollow),
FeatureType(MutualFollow, FollowedBy),
FeatureType(MutualFollow, FavoritedBy),
FeatureType(MutualFollow, RetweetedBy),
FeatureType(MutualFollow, MentionedBy),
FeatureType(MutualFollow, MutualFollow)
)
final val SocialProofTwoHop = Seq(FeatureType(Following, FollowedBy))
final val HtlTwoHop = DefaultTwoHop
final val WtfTwoHop = SocialProofTwoHop
final val SqTwoHop = DefaultTwoHop
final val RuxTwoHop = DefaultTwoHop
final val MRTwoHop = DefaultTwoHop
final val UserTypeaheadTwoHop = SocialProofTwoHop
final val presetFeatureTypes =
(HtlTwoHop ++ WtfTwoHop ++ SqTwoHop ++ RuxTwoHop ++ MRTwoHop ++ UserTypeaheadTwoHop).toSet
def getFeatureTypes(
presetFeatureTypes: PresetFeatureTypes,
featureTypes: Seq[FeatureType]
): Seq[FeatureType] = {
presetFeatureTypes match {
case PresetFeatureTypes.HtlTwoHop => HtlTwoHop
case PresetFeatureTypes.WtfTwoHop => WtfTwoHop
case PresetFeatureTypes.SqTwoHop => SqTwoHop
case PresetFeatureTypes.RuxTwoHop => RuxTwoHop
case PresetFeatureTypes.MrTwoHop => MRTwoHop
case PresetFeatureTypes.UserTypeaheadTwoHop => UserTypeaheadTwoHop
case _ => featureTypes
}
}
}

View File

@ -1,242 +0,0 @@
package com.twitter.graph_feature_service.util
import com.twitter.graph_feature_service.thriftscala.{
FeatureType,
IntersectionValue,
WorkerIntersectionValue
}
import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
/**
* Functions for computing feature values based on the values returned by constantDB.
*/
object IntersectionValueCalculator {
/**
* Compute the size of the array in a ByteBuffer.
* Note that this function assumes the ByteBuffer is encoded using Injections.seqLong2ByteBuffer
*/
def computeArraySize(x: ByteBuffer): Int = {
x.remaining() >> 3 // divide 8
}
/**
*
*/
def apply(x: ByteBuffer, y: ByteBuffer, intersectionIdLimit: Int): WorkerIntersectionValue = {
val xSize = computeArraySize(x)
val ySize = computeArraySize(y)
val largerArray = if (xSize > ySize) x else y
val smallerArray = if (xSize > ySize) y else x
if (intersectionIdLimit == 0) {
val result = computeIntersectionUsingBinarySearchOnLargerByteBuffer(smallerArray, largerArray)
WorkerIntersectionValue(result, xSize, ySize)
} else {
val (result, ids) = computeIntersectionWithIds(smallerArray, largerArray, intersectionIdLimit)
WorkerIntersectionValue(result, xSize, ySize, ids)
}
}
/**
* Note that this function assumes the ByteBuffer is encoded using Injections.seqLong2ByteBuffer
*
*/
def computeIntersectionUsingBinarySearchOnLargerByteBuffer(
smallArray: ByteBuffer,
largeArray: ByteBuffer
): Int = {
var res: Int = 0
var i: Int = 0
while (i < smallArray.remaining()) {
if (binarySearch(largeArray, smallArray.getLong(i)) >= 0) {
res += 1
}
i += 8
}
res
}
def computeIntersectionWithIds(
smallArray: ByteBuffer,
largeArray: ByteBuffer,
intersectionLimit: Int
): (Int, Seq[Long]) = {
var res: Int = 0
var i: Int = 0
// Most of the intersectionLimit is smaller than default size: 16
val idBuffer = ArrayBuffer[Long]()
while (i < smallArray.remaining()) {
val value = smallArray.getLong(i)
if (binarySearch(largeArray, value) >= 0) {
res += 1
// Always get the smaller ids
if (idBuffer.size < intersectionLimit) {
idBuffer += value
}
}
i += 8
}
(res, idBuffer)
}
/**
* Note that this function assumes the ByteBuffer is encoded using Injections.seqLong2ByteBuffer
*
*/
private[util] def binarySearch(arr: ByteBuffer, value: Long): Int = {
var start = 0
var end = arr.remaining()
while (start <= end && start < arr.remaining()) {
val mid = ((start + end) >> 1) & ~7 // take mid - mid % 8
if (arr.getLong(mid) == value) {
return mid // return the index of the value
} else if (arr.getLong(mid) < value) {
start = mid + 8
} else {
end = mid - 1
}
}
// if not existed, return -1
-1
}
/**
* TODO: for now it only computes intersection size. Will add more feature types (e.g., dot
* product, maximum value).
*
* NOTE that this function assumes both x and y are SORTED arrays.
* In graph feature service, the sorting is done in the offline Scalding job.
*
* @param x source user's array
* @param y candidate user's array
* @param featureType feature type
* @return
*/
def apply(x: Array[Long], y: Array[Long], featureType: FeatureType): IntersectionValue = {
val xSize = x.length
val ySize = y.length
val intersection =
if (xSize.min(ySize) * math.log(xSize.max(ySize)) < (xSize + ySize).toDouble) {
if (xSize < ySize) {
computeIntersectionUsingBinarySearchOnLargerArray(x, y)
} else {
computeIntersectionUsingBinarySearchOnLargerArray(y, x)
}
} else {
computeIntersectionUsingListMerging(x, y)
}
IntersectionValue(
featureType,
Some(intersection.toInt),
None, // return None for now
Some(xSize),
Some(ySize)
)
}
/**
* Function for computing the intersections of two SORTED arrays by list merging.
*
* @param x one array
* @param y another array
* @param ordering ordering function for comparing values of T
* @tparam T type
* @return The intersection size and the list of intersected elements
*/
private[util] def computeIntersectionUsingListMerging[T](
x: Array[T],
y: Array[T]
)(
implicit ordering: Ordering[T]
): Int = {
var res: Int = 0
var i: Int = 0
var j: Int = 0
while (i < x.length && j < y.length) {
val comp = ordering.compare(x(i), y(j))
if (comp > 0) j += 1
else if (comp < 0) i += 1
else {
res += 1
i += 1
j += 1
}
}
res
}
/**
* Function for computing the intersections of two arrays by binary search on the larger array.
* Note that the larger array MUST be SORTED.
*
* @param smallArray smaller array
* @param largeArray larger array
* @param ordering ordering function for comparing values of T
* @tparam T type
*
* @return The intersection size and the list of intersected elements
*/
private[util] def computeIntersectionUsingBinarySearchOnLargerArray[T](
smallArray: Array[T],
largeArray: Array[T]
)(
implicit ordering: Ordering[T]
): Int = {
var res: Int = 0
var i: Int = 0
while (i < smallArray.length) {
val currentValue: T = smallArray(i)
if (binarySearch(largeArray, currentValue) >= 0) {
res += 1
}
i += 1
}
res
}
/**
* Function for doing the binary search
*
* @param arr array
* @param value the target value for searching
* @param ordering ordering function
* @tparam T type
* @return the index of element in the larger array.
* If there is no such element in the array, return -1.
*/
private[util] def binarySearch[T](
arr: Array[T],
value: T
)(
implicit ordering: Ordering[T]
): Int = {
var start = 0
var end = arr.length - 1
while (start <= end) {
val mid = (start + end) >> 1
val comp = ordering.compare(arr(mid), value)
if (comp == 0) {
return mid // return the index of the value
} else if (comp < 0) {
start = mid + 1
} else {
end = mid - 1
}
}
// if not existed, return -1
-1
}
}

View File

@ -1,30 +0,0 @@
scala_library(
sources = ["**/*.scala"],
platform = "java8",
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/com/google/inject:guice",
"3rdparty/jvm/javax/inject:javax.inject",
"3rdparty/jvm/net/codingwell:scala-guice",
"discovery-common/src/main/scala/com/twitter/discovery/common/stats",
"finatra-internal/decider/src/main/scala",
"finatra-internal/gizmoduck/src/main/scala",
"finatra-internal/mtls-thriftmux/src/main/scala",
"finatra/inject/inject-app/src/main/scala",
"finatra/inject/inject-core/src/main/scala",
"finatra/inject/inject-server/src/main/scala",
"finatra/inject/inject-thrift-client/src/main/scala",
"finatra/inject/inject-utils/src/main/scala",
"frigate/frigate-common:constdb_util",
"graph-feature-service/src/main/resources",
"graph-feature-service/src/main/scala/com/twitter/graph_feature_service/common",
"graph-feature-service/src/main/scala/com/twitter/graph_feature_service/util",
"graph-feature-service/src/main/thrift/com/twitter/graph_feature_service:graph_feature_service_thrift-scala",
"hermit/hermit-core/src/main/scala/com/twitter/hermit/store/common",
"servo/request/src/main/scala",
"twitter-server-internal/src/main/scala",
"twitter-server/server/src/main/scala",
"util/util-app/src/main/scala",
"util/util-slf4j-api/src/main/scala",
],
)

View File

@ -1,58 +0,0 @@
package com.twitter.graph_feature_service.worker
import com.google.inject.Module
import com.twitter.finatra.decider.modules.DeciderModule
import com.twitter.finatra.gizmoduck.modules.TimerModule
import com.twitter.finatra.mtls.thriftmux.Mtls
import com.twitter.finatra.thrift.ThriftServer
import com.twitter.finatra.thrift.filters.{
LoggingMDCFilter,
StatsFilter,
ThriftMDCFilter,
TraceIdMDCFilter
}
import com.twitter.finatra.mtls.thriftmux.modules.MtlsThriftWebFormsModule
import com.twitter.finatra.thrift.routing.ThriftRouter
import com.twitter.graph_feature_service.thriftscala
import com.twitter.graph_feature_service.worker.controllers.WorkerController
import com.twitter.graph_feature_service.worker.handlers.WorkerWarmupHandler
import com.twitter.graph_feature_service.worker.modules.{
GraphContainerProviderModule,
WorkerFlagModule
}
import com.twitter.graph_feature_service.worker.util.GraphContainer
import com.twitter.inject.thrift.modules.ThriftClientIdModule
import com.twitter.util.Await
object Main extends WorkerMain
class WorkerMain extends ThriftServer with Mtls {
override val name = "graph_feature_service-worker"
override val modules: Seq[Module] = {
Seq(
WorkerFlagModule,
DeciderModule,
TimerModule,
ThriftClientIdModule,
GraphContainerProviderModule,
new MtlsThriftWebFormsModule[thriftscala.Worker.MethodPerEndpoint](this)
)
}
override def configureThrift(router: ThriftRouter): Unit = {
router
.filter[LoggingMDCFilter]
.filter[TraceIdMDCFilter]
.filter[ThriftMDCFilter]
.filter[StatsFilter]
.add[WorkerController]
}
override protected def warmup(): Unit = {
val graphContainer = injector.instance[GraphContainer]
Await.result(graphContainer.warmup)
handle[WorkerWarmupHandler]()
}
}

View File

@ -1,38 +0,0 @@
package com.twitter.graph_feature_service.worker.controllers
import com.twitter.discovery.common.stats.DiscoveryStatsFilter
import com.twitter.finagle.Service
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finatra.thrift.Controller
import com.twitter.graph_feature_service.thriftscala
import com.twitter.graph_feature_service.thriftscala.Worker.GetIntersection
import com.twitter.graph_feature_service.thriftscala._
import com.twitter.graph_feature_service.worker.handlers._
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
class WorkerController @Inject() (
workerGetIntersectionHandler: WorkerGetIntersectionHandler
)(
implicit statsReceiver: StatsReceiver)
extends Controller(thriftscala.Worker) {
// use DiscoveryStatsFilter to filter out exceptions out of our control
private val getIntersectionService: Service[
WorkerIntersectionRequest,
WorkerIntersectionResponse
] =
new DiscoveryStatsFilter[WorkerIntersectionRequest, WorkerIntersectionResponse](
statsReceiver.scope("srv").scope("get_intersection")
).andThen(Service.mk(workerGetIntersectionHandler))
val getIntersection: Service[GetIntersection.Args, WorkerIntersectionResponse] = { args =>
getIntersectionService(args.request).onFailure { throwable =>
logger.error(s"Failure to get intersection for request $args.", throwable)
}
}
handle(GetIntersection) { getIntersection }
}

View File

@ -1,105 +0,0 @@
package com.twitter.graph_feature_service.worker.handlers
import com.twitter.finagle.stats.{Stat, StatsReceiver}
import com.twitter.graph_feature_service.thriftscala.{
WorkerIntersectionRequest,
WorkerIntersectionResponse,
WorkerIntersectionValue
}
import com.twitter.graph_feature_service.util.{FeatureTypesCalculator, IntersectionValueCalculator}
import com.twitter.graph_feature_service.util.IntersectionValueCalculator._
import com.twitter.graph_feature_service.worker.util.GraphContainer
import com.twitter.servo.request.RequestHandler
import com.twitter.util.Future
import java.nio.ByteBuffer
import javax.inject.{Inject, Singleton}
@Singleton
class WorkerGetIntersectionHandler @Inject() (
graphContainer: GraphContainer,
statsReceiver: StatsReceiver)
extends RequestHandler[WorkerIntersectionRequest, WorkerIntersectionResponse] {
import WorkerGetIntersectionHandler._
private val stats: StatsReceiver = statsReceiver.scope("srv/get_intersection")
private val numCandidatesCount = stats.counter("total_num_candidates")
private val toPartialGraphQueryStat = stats.stat("to_partial_graph_query_latency")
private val fromPartialGraphQueryStat = stats.stat("from_partial_graph_query_latency")
private val intersectionCalculationStat = stats.stat("computation_latency")
override def apply(request: WorkerIntersectionRequest): Future[WorkerIntersectionResponse] = {
numCandidatesCount.incr(request.candidateUserIds.length)
val userId = request.userId
// NOTE: do not change the order of candidates
val candidateIds = request.candidateUserIds
// NOTE: do not change the order of features
val featureTypes =
FeatureTypesCalculator.getFeatureTypes(request.presetFeatureTypes, request.featureTypes)
val leftEdges = featureTypes.map(_.leftEdgeType).distinct
val rightEdges = featureTypes.map(_.rightEdgeType).distinct
val rightEdgeMap = Stat.time(toPartialGraphQueryStat) {
rightEdges.map { rightEdge =>
val map = graphContainer.toPartialMap.get(rightEdge) match {
case Some(graph) =>
candidateIds.flatMap { candidateId =>
graph.apply(candidateId).map(candidateId -> _)
}.toMap
case None =>
Map.empty[Long, ByteBuffer]
}
rightEdge -> map
}.toMap
}
val leftEdgeMap = Stat.time(fromPartialGraphQueryStat) {
leftEdges.flatMap { leftEdge =>
graphContainer.toPartialMap.get(leftEdge).flatMap(_.apply(userId)).map(leftEdge -> _)
}.toMap
}
val res = Stat.time(intersectionCalculationStat) {
WorkerIntersectionResponse(
// NOTE that candidate ordering is important
candidateIds.map { candidateId =>
// NOTE that the featureTypes ordering is important
featureTypes.map {
featureType =>
val leftNeighborsOpt = leftEdgeMap.get(featureType.leftEdgeType)
val rightNeighborsOpt =
rightEdgeMap.get(featureType.rightEdgeType).flatMap(_.get(candidateId))
if (leftNeighborsOpt.isEmpty && rightNeighborsOpt.isEmpty) {
EmptyWorkerIntersectionValue
} else if (rightNeighborsOpt.isEmpty) {
EmptyWorkerIntersectionValue.copy(
leftNodeDegree = computeArraySize(leftNeighborsOpt.get)
)
} else if (leftNeighborsOpt.isEmpty) {
EmptyWorkerIntersectionValue.copy(
rightNodeDegree = computeArraySize(rightNeighborsOpt.get)
)
} else {
IntersectionValueCalculator(
leftNeighborsOpt.get,
rightNeighborsOpt.get,
request.intersectionIdLimit)
}
}
}
)
}
Future.value(res)
}
}
object WorkerGetIntersectionHandler {
val EmptyWorkerIntersectionValue: WorkerIntersectionValue = WorkerIntersectionValue(0, 0, 0, Nil)
}

View File

@ -1,14 +0,0 @@
package com.twitter.graph_feature_service.worker.handlers
import com.twitter.finatra.thrift.routing.ThriftWarmup
import com.twitter.inject.Logging
import com.twitter.inject.utils.Handler
import javax.inject.{Inject, Singleton}
@Singleton
class WorkerWarmupHandler @Inject() (warmup: ThriftWarmup) extends Handler with Logging {
override def handle(): Unit = {
info("Warmup Done!")
}
}

View File

@ -1,62 +0,0 @@
package com.twitter.graph_feature_service.worker.modules
import com.google.inject.Provides
import com.twitter.concurrent.AsyncSemaphore
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.graph_feature_service.common.Configs._
import com.twitter.graph_feature_service.worker.util
import com.twitter.graph_feature_service.worker.util.AutoUpdatingGraph
import com.twitter.graph_feature_service.worker.util.FollowedByPartialValueGraph
import com.twitter.graph_feature_service.worker.util.FollowingPartialValueGraph
import com.twitter.graph_feature_service.worker.util.GraphContainer
import com.twitter.graph_feature_service.worker.util.GraphKey
import com.twitter.graph_feature_service.worker.util.MutualFollowPartialValueGraph
import com.twitter.inject.TwitterModule
import com.twitter.inject.annotations.Flag
import com.twitter.util.Timer
import javax.inject.Singleton
object GraphContainerProviderModule extends TwitterModule {
@Provides
@Singleton
def provideAutoUpdatingGraphs(
@Flag(WorkerFlagNames.HdfsCluster) hdfsCluster: String,
@Flag(WorkerFlagNames.HdfsClusterUrl) hdfsClusterUrl: String,
@Flag(WorkerFlagNames.ShardId) shardId: Int
)(
implicit statsReceiver: StatsReceiver,
timer: Timer
): GraphContainer = {
// NOTE that we do not load some the graphs for saving RAM at this moment.
val enabledGraphPaths: Map[GraphKey, String] =
Map(
FollowingPartialValueGraph -> FollowOutValPath,
FollowedByPartialValueGraph -> FollowInValPath
)
// Only allow one graph to update at the same time.
val sharedSemaphore = new AsyncSemaphore(1)
val graphs: Map[GraphKey, AutoUpdatingGraph] =
enabledGraphPaths.map {
case (graphKey, path) =>
graphKey -> AutoUpdatingGraph(
dataPath = getHdfsPath(path),
hdfsCluster = hdfsCluster,
hdfsClusterUrl = hdfsClusterUrl,
shard = shardId,
minimumSizeForCompleteGraph = 1e6.toLong,
sharedSemaphore = Some(sharedSemaphore)
)(
statsReceiver
.scope("graphs")
.scope(graphKey.getClass.getSimpleName),
timer
)
}
util.GraphContainer(graphs)
}
}

View File

@ -1,33 +0,0 @@
package com.twitter.graph_feature_service.worker.modules
import com.twitter.inject.TwitterModule
object WorkerFlagNames {
final val ServiceRole = "service.role"
final val ServiceEnv = "service.env"
final val ShardId = "service.shardId"
final val NumShards = "service.numShards"
final val HdfsCluster = "service.hdfsCluster"
final val HdfsClusterUrl = "service.hdfsClusterUrl"
}
/**
* Initializes references to the flag values defined in the aurora.deploy file.
* To check what the flag values are initialized in runtime, search FlagsModule in stdout
*/
object WorkerFlagModule extends TwitterModule {
import WorkerFlagNames._
flag[Int](ShardId, "Shard Id")
flag[Int](NumShards, "Num of Graph Shards")
flag[String](ServiceRole, "Service Role")
flag[String](ServiceEnv, "Service Env")
flag[String](HdfsCluster, "Hdfs cluster to download graph files from")
flag[String](HdfsClusterUrl, "Hdfs cluster url to download graph files from")
}

View File

@ -1,69 +0,0 @@
package com.twitter.graph_feature_service.worker.util
import com.twitter.bijection.Injection
import com.twitter.concurrent.AsyncSemaphore
import com.twitter.conversions.DurationOps._
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.frigate.common.constdb_util.{
AutoUpdatingReadOnlyGraph,
ConstDBImporter,
Injections
}
import com.twitter.graph_feature_service.common.Configs
import com.twitter.util.{Duration, Future, Timer}
import java.nio.ByteBuffer
/**
* @param dataPath the path to the data on HDFS
* @param hdfsCluster cluster where we check for updates and download graph files from
* @param hdfsClusterUrl url to HDFS cluster
* @param shard The shard of the graph to download
* @param minimumSizeForCompleteGraph minimumSize for complete graph - otherwise we don't load it
* @param updateIntervalMin The interval after which the first update is tried and the interval between such updates
* @param updateIntervalMax the maximum time before an update is triggered
* @param deleteInterval The interval after which older data is deleted from disk
* @param sharedSemaphore The semaphore controls the number of graph loads at same time on the instance.
*/
case class AutoUpdatingGraph(
dataPath: String,
hdfsCluster: String,
hdfsClusterUrl: String,
shard: Int,
minimumSizeForCompleteGraph: Long,
updateIntervalMin: Duration = 1.hour,
updateIntervalMax: Duration = 12.hours,
deleteInterval: Duration = 2.seconds,
sharedSemaphore: Option[AsyncSemaphore] = None
)(
implicit statsReceiver: StatsReceiver,
timer: Timer)
extends AutoUpdatingReadOnlyGraph[Long, ByteBuffer](
hdfsCluster,
hdfsClusterUrl,
shard,
minimumSizeForCompleteGraph,
updateIntervalMin,
updateIntervalMax,
deleteInterval,
sharedSemaphore
)
with ConstDBImporter[Long, ByteBuffer] {
override def numGraphShards: Int = Configs.NumGraphShards
override def basePath: String = dataPath
override val keyInj: Injection[Long, ByteBuffer] = Injections.long2Varint
override val valueInj: Injection[ByteBuffer, ByteBuffer] = Injection.identity
override def get(targetId: Long): Future[Option[ByteBuffer]] =
super
.get(targetId)
.map { res =>
res.foreach(r => arraySizeStat.add(r.remaining()))
res
}
private val arraySizeStat = stats.scope("get").stat("size")
}

View File

@ -1,14 +0,0 @@
package com.twitter.graph_feature_service.worker.util
import com.twitter.graph_feature_service.thriftscala.EdgeType
sealed trait GfsQuery {
def edgeType: EdgeType
def userId: Long
}
/**
* Search for edges for any users to users in local partition.
*/
case class ToPartialQuery(edgeType: EdgeType, userId: Long) extends GfsQuery

View File

@ -1,19 +0,0 @@
package com.twitter.graph_feature_service.worker.util
import com.twitter.graph_feature_service.thriftscala.EdgeType
import com.twitter.util.Future
case class GraphContainer(
graphs: Map[GraphKey, AutoUpdatingGraph]) {
final val toPartialMap: Map[EdgeType, AutoUpdatingGraph] =
graphs.collect {
case (partialValueGraph: PartialValueGraph, graph) =>
partialValueGraph.edgeType -> graph
}
// load all the graphs from constantDB format to memory
def warmup: Future[Unit] = {
Future.collect(graphs.mapValues(_.warmup())).unit
}
}

View File

@ -1,32 +0,0 @@
package com.twitter.graph_feature_service.worker.util
import com.twitter.graph_feature_service.thriftscala.EdgeType
import com.twitter.graph_feature_service.thriftscala.EdgeType._
sealed trait GraphKey {
def edgeType: EdgeType
}
sealed trait PartialValueGraph extends GraphKey
/**
* Follow Graphs
*/
object FollowingPartialValueGraph extends PartialValueGraph {
override def edgeType: EdgeType = Following
}
object FollowedByPartialValueGraph extends PartialValueGraph {
override def edgeType: EdgeType = FollowedBy
}
/**
* Mutual Follow Graphs
*/
object MutualFollowPartialValueGraph extends PartialValueGraph {
override def edgeType: EdgeType = MutualFollow
}

View File

@ -1,16 +0,0 @@
package com.twitter.graph_feature_service.worker.util
//These classes are to help the GraphContainer choose the right data structure to answer queries
sealed trait GraphType
object FollowGraph extends GraphType
object FavoriteGraph extends GraphType
object RetweetGraph extends GraphType
object ReplyGraph extends GraphType
object MentionGraph extends GraphType
object MutualFollowGraph extends GraphType

View File

@ -1,66 +0,0 @@
scala_library(
platform = "java8",
tags = [
"bazel-compatible",
"bazel-only",
],
dependencies = [
"3rdparty/jvm/com/twitter/bijection:core",
"frigate/frigate-common/src/main/scala/com/twitter/frigate/common/constdb_util",
"graph-feature-service/src/main/scala/com/twitter/graph_feature_service/common",
"src/scala/com/twitter/interaction_graph/scio/agg_all:interaction_graph_history_aggregated_edge_snapshot-scala",
"src/scala/com/twitter/interaction_graph/scio/ml/scores:real_graph_in_scores-scala",
"src/scala/com/twitter/pluck/source/user_audits:user_audit_final-scala",
"src/scala/com/twitter/scalding_internal/dalv2",
"src/scala/com/twitter/scalding_internal/job",
"src/scala/com/twitter/scalding_internal/job/analytics_batch",
],
)
scalding_job(
name = "graph_feature_service_adhoc_job",
main = "com.twitter.graph_feature_service.scalding.GraphFeatureServiceAdhocApp",
args = [
"--date 2022-10-24",
],
config = [
("hadoop.map.jvm.total-memory", "3072m"),
("hadoop.reduce.jvm.total-memory", "3072m"),
("hadoop.submitter.jvm.total-memory", "5120m"),
("submitter.tier", "preemptible"),
],
contact = "recos-platform-alerts@twitter.com",
hadoop_cluster = "atla-proc",
hadoop_properties = [("mapreduce.job.hdfs-servers", "/atla/proc/user/cassowary")],
platform = "java8",
role = "cassowary",
runtime_platform = "java8",
tags = [
"bazel-compatible:migrated",
"bazel-only",
],
dependencies = [":scalding"],
)
scalding_job(
name = "graph_feature_service_daily_job",
main = "com.twitter.graph_feature_service.scalding.GraphFeatureServiceScheduledApp",
config = [
("hadoop.map.jvm.total-memory", "3072m"),
("hadoop.reduce.jvm.total-memory", "3072m"),
("hadoop.submitter.jvm.total-memory", "5120m"),
("submitter.tier", "preemptible"),
],
contact = "recos-platform-alerts@twitter.com",
cron = "01,31 * * * *",
hadoop_cluster = "atla-proc",
hadoop_properties = [("mapreduce.job.hdfs-servers", "/atla/proc/user/cassowary")],
platform = "java8",
role = "cassowary",
runtime_platform = "java8",
tags = [
"bazel-compatible:migrated",
"bazel-only",
],
dependencies = [":scalding"],
)

View File

@ -1,9 +0,0 @@
package com.twitter.graph_feature_service.scalding
case class EdgeFeature(
realGraphScore: Float,
followScore: Option[Float] = None,
mutualFollowScore: Option[Float] = None,
favoriteScore: Option[Float] = None,
retweetScore: Option[Float] = None,
mentionScore: Option[Float] = None)

View File

@ -1,85 +0,0 @@
package com.twitter.graph_feature_service.scalding
import com.twitter.scalding._
import com.twitter.scalding_internal.job.TwitterExecutionApp
import com.twitter.scalding_internal.job.analytics_batch.{
AnalyticsBatchExecution,
AnalyticsBatchExecutionArgs,
BatchDescription,
BatchFirstTime,
BatchIncrement,
TwitterScheduledExecutionApp
}
import java.util.TimeZone
/**
* Each job only needs to implement this runOnDateRange() function. It makes it easier for testing.
*/
trait GraphFeatureServiceBaseJob {
implicit val timeZone: TimeZone = DateOps.UTC
implicit val dateParser: DateParser = DateParser.default
def runOnDateRange(
enableValueGraphs: Option[Boolean] = None,
enableKeyGraphs: Option[Boolean] = None
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit]
/**
* Print customized counters in the log
*/
def printerCounters[T](execution: Execution[T]): Execution[Unit] = {
execution.getCounters
.flatMap {
case (_, counters) =>
counters.toMap.toSeq
.sortBy(e => (e._1.group, e._1.counter))
.foreach {
case (statKey, value) =>
println(s"${statKey.group}\t${statKey.counter}\t$value")
}
Execution.unit
}
}
}
/**
* Trait that wraps things about adhoc jobs.
*/
trait GraphFeatureServiceAdhocBaseApp extends TwitterExecutionApp with GraphFeatureServiceBaseJob {
override def job: Execution[Unit] = Execution.withId { implicit uniqueId =>
Execution.getArgs.flatMap { args: Args =>
implicit val dateRange: DateRange = DateRange.parse(args.list("date"))(timeZone, dateParser)
printerCounters(runOnDateRange())
}
}
}
/**
* Trait that wraps things about scheduled jobs.
*
* A new daily app only needs to declare the starting date.
*/
trait GraphFeatureServiceScheduledBaseApp
extends TwitterScheduledExecutionApp
with GraphFeatureServiceBaseJob {
def firstTime: RichDate // for example: RichDate("2018-02-21")
def batchIncrement: Duration = Days(1)
override def scheduledJob: Execution[Unit] = Execution.withId { implicit uniqueId =>
val analyticsArgs = AnalyticsBatchExecutionArgs(
batchDesc = BatchDescription(getClass.getName),
firstTime = BatchFirstTime(firstTime),
batchIncrement = BatchIncrement(batchIncrement)
)
AnalyticsBatchExecution(analyticsArgs) { implicit dateRange =>
printerCounters(runOnDateRange())
}
}
}

View File

@ -1,52 +0,0 @@
package com.twitter.graph_feature_service.scalding
import com.twitter.scalding.DateRange
import com.twitter.scalding.Execution
import com.twitter.scalding.RichDate
import com.twitter.scalding.UniqueID
import java.util.Calendar
import java.util.TimeZone
import sun.util.calendar.BaseCalendar
/**
* To launch an adhoc run:
*
scalding remote run --target graph-feature-service/src/main/scalding/com/twitter/graph_feature_service/scalding:graph_feature_service_adhoc_job
*/
object GraphFeatureServiceAdhocApp
extends GraphFeatureServiceMainJob
with GraphFeatureServiceAdhocBaseApp {}
/**
* To schedule the job, upload the workflows config (only required for the first time and subsequent config changes):
* scalding workflow upload --jobs graph-feature-service/src/main/scalding/com/twitter/graph_feature_service/scalding:graph_feature_service_daily_job --autoplay --build-cron-schedule "20 23 1 * *"
* You can then build from the UI by clicking "Build" and pasting in your remote branch, or leave it empty if you're redeploying from master.
* The workflows config above should automatically trigger once each month.
*/
object GraphFeatureServiceScheduledApp
extends GraphFeatureServiceMainJob
with GraphFeatureServiceScheduledBaseApp {
override def firstTime: RichDate = RichDate("2018-05-18")
override def runOnDateRange(
enableValueGraphs: Option[Boolean],
enableKeyGraphs: Option[Boolean]
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
// Only run the value Graphs on Tuesday, Thursday, Saturday
val overrideEnableValueGraphs = {
val dayOfWeek = dateRange.start.toCalendar.get(Calendar.DAY_OF_WEEK)
dayOfWeek == BaseCalendar.TUESDAY |
dayOfWeek == BaseCalendar.THURSDAY |
dayOfWeek == BaseCalendar.SATURDAY
}
super.runOnDateRange(
Some(true),
Some(false) // disable key Graphs since we are not using them in production
)
}
}

View File

@ -1,297 +0,0 @@
package com.twitter.graph_feature_service.scalding
import com.twitter.bijection.Injection
import com.twitter.frigate.common.constdb_util.Injections
import com.twitter.frigate.common.constdb_util.ScaldingUtil
import com.twitter.graph_feature_service.common.Configs
import com.twitter.graph_feature_service.common.Configs._
import com.twitter.interaction_graph.scio.agg_all.InteractionGraphHistoryAggregatedEdgeSnapshotScalaDataset
import com.twitter.interaction_graph.scio.ml.scores.RealGraphInScoresScalaDataset
import com.twitter.interaction_graph.thriftscala.FeatureName
import com.twitter.interaction_graph.thriftscala.{EdgeFeature => TEdgeFeature}
import com.twitter.pluck.source.user_audits.UserAuditFinalScalaDataset
import com.twitter.scalding.DateRange
import com.twitter.scalding.Days
import com.twitter.scalding.Execution
import com.twitter.scalding.Stat
import com.twitter.scalding.UniqueID
import com.twitter.scalding.typed.TypedPipe
import com.twitter.scalding_internal.dalv2.DAL
import com.twitter.scalding_internal.dalv2.remote_access.AllowCrossClusterSameDC
import com.twitter.scalding_internal.multiformat.format.keyval.KeyVal
import com.twitter.util.Time
import com.twitter.wtf.candidate.thriftscala.CandidateSeq
import java.nio.ByteBuffer
import java.util.TimeZone
trait GraphFeatureServiceMainJob extends GraphFeatureServiceBaseJob {
// keeping hdfsPath as a separate variable in order to override it in unit tests
protected val hdfsPath: String = BaseHdfsPath
protected def getShardIdForUser(userId: Long): Int = shardForUser(userId)
protected implicit val keyInj: Injection[Long, ByteBuffer] = Injections.long2Varint
protected implicit val valueInj: Injection[Long, ByteBuffer] = Injections.long2ByteBuffer
protected val bufferSize: Int = 1 << 26
protected val maxNumKeys: Int = 1 << 24
protected val numReducers: Int = NumGraphShards
protected val outputStreamBufferSize: Int = 1 << 26
protected final val shardingByKey = { (k: Long, _: Long) =>
getShardIdForUser(k)
}
protected final val shardingByValue = { (_: Long, v: Long) =>
getShardIdForUser(v)
}
private def writeGraphToDB(
graph: TypedPipe[(Long, Long)],
shardingFunction: (Long, Long) => Int,
path: String
)(
implicit dateRange: DateRange
): Execution[TypedPipe[(Int, Unit)]] = {
ScaldingUtil
.writeConstDB[Long, Long](
graph.withDescription(s"sharding $path"),
shardingFunction,
shardId =>
getTimedHdfsShardPath(
shardId,
getHdfsPath(path, Some(hdfsPath)),
Time.fromMilliseconds(dateRange.end.timestamp)
),
Int.MaxValue,
bufferSize,
maxNumKeys,
numReducers,
outputStreamBufferSize
)(
keyInj,
valueInj,
Ordering[(Long, Long)]
)
.forceToDiskExecution
}
def extractFeature(
featureList: Seq[TEdgeFeature],
featureName: FeatureName
): Option[Float] = {
featureList
.find(_.name == featureName)
.map(_.tss.ewma.toFloat)
.filter(_ > 0.0)
}
/**
* Function to extract a subgraph (e.g., follow graph) from real graph and take top K by real graph
* weight.
*
* @param input input real graph
* @param edgeFilter filter function to only get the edges needed (e.g., only follow edges)
* @param counter counter
* @return a subgroup that contains topK, e.g., follow graph for each user.
*/
private def getSubGraph(
input: TypedPipe[(Long, Long, EdgeFeature)],
edgeFilter: EdgeFeature => Boolean,
counter: Stat
): TypedPipe[(Long, Long)] = {
input
.filter(c => edgeFilter(c._3))
.map {
case (srcId, destId, features) =>
(srcId, (destId, features.realGraphScore))
}
.group
// auto reducer estimation only allocates 15 reducers, so setting an explicit number here
.withReducers(2000)
.sortedReverseTake(TopKRealGraph)(Ordering.by(_._2))
.flatMap {
case (srcId, topKNeighbors) =>
counter.inc()
topKNeighbors.map {
case (destId, _) =>
(srcId, destId)
}
}
}
def getMauIds()(implicit dateRange: DateRange, uniqueID: UniqueID): TypedPipe[Long] = {
val numMAUs = Stat("NUM_MAUS")
val uniqueMAUs = Stat("UNIQUE_MAUS")
DAL
.read(UserAuditFinalScalaDataset)
.withRemoteReadPolicy(AllowCrossClusterSameDC)
.toTypedPipe
.collect {
case user_audit if user_audit.isValid =>
numMAUs.inc()
user_audit.userId
}
.distinct
.map { u =>
uniqueMAUs.inc()
u
}
}
def getRealGraphWithMAUOnly(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): TypedPipe[(Long, Long, EdgeFeature)] = {
val numMAUs = Stat("NUM_MAUS")
val uniqueMAUs = Stat("UNIQUE_MAUS")
val monthlyActiveUsers = DAL
.read(UserAuditFinalScalaDataset)
.withRemoteReadPolicy(AllowCrossClusterSameDC)
.toTypedPipe
.collect {
case user_audit if user_audit.isValid =>
numMAUs.inc()
user_audit.userId
}
.distinct
.map { u =>
uniqueMAUs.inc()
u
}
.asKeys
val realGraphAggregates = DAL
.readMostRecentSnapshot(
InteractionGraphHistoryAggregatedEdgeSnapshotScalaDataset,
dateRange.embiggen(Days(5)))
.withRemoteReadPolicy(AllowCrossClusterSameDC)
.toTypedPipe
.map { edge =>
val featureList = edge.features
val edgeFeature = EdgeFeature(
edge.weight.getOrElse(0.0).toFloat,
extractFeature(featureList, FeatureName.NumMutualFollows),
extractFeature(featureList, FeatureName.NumFavorites),
extractFeature(featureList, FeatureName.NumRetweets),
extractFeature(featureList, FeatureName.NumMentions)
)
(edge.sourceId, (edge.destinationId, edgeFeature))
}
.join(monthlyActiveUsers)
.map {
case (srcId, ((destId, feature), _)) =>
(destId, (srcId, feature))
}
.join(monthlyActiveUsers)
.map {
case (destId, ((srcId, feature), _)) =>
(srcId, destId, feature)
}
realGraphAggregates
}
def getTopKFollowGraph(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): TypedPipe[(Long, Long)] = {
val followGraphMauStat = Stat("NumFollowEdges_MAU")
val mau: TypedPipe[Long] = getMauIds()
DAL
.readMostRecentSnapshot(RealGraphInScoresScalaDataset, dateRange.embiggen(Days(7)))
.withRemoteReadPolicy(AllowCrossClusterSameDC)
.toTypedPipe
.groupBy(_.key)
.join(mau.asKeys)
.withDescription("filtering srcId by mau")
.flatMap {
case (_, (KeyVal(srcId, CandidateSeq(candidates)), _)) =>
followGraphMauStat.inc()
val topK = candidates.sortBy(-_.score).take(TopKRealGraph)
topK.map { c => (srcId, c.userId) }
}
}
override def runOnDateRange(
enableValueGraphs: Option[Boolean],
enableKeyGraphs: Option[Boolean]
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
val processValueGraphs = enableValueGraphs.getOrElse(Configs.EnableValueGraphs)
val processKeyGraphs = enableKeyGraphs.getOrElse(Configs.EnableKeyGraphs)
if (!processKeyGraphs && !processValueGraphs) {
// Skip the batch job
Execution.unit
} else {
// val favoriteGraphStat = Stat("NumFavoriteEdges")
// val retweetGraphStat = Stat("NumRetweetEdges")
// val mentionGraphStat = Stat("NumMentionEdges")
// val realGraphAggregates = getRealGraphWithMAUOnly
val followGraph = getTopKFollowGraph
// val mutualFollowGraph = followGraph.asKeys.join(followGraph.swap.asKeys).keys
// val favoriteGraph =
// getSubGraph(realGraphAggregates, _.favoriteScore.isDefined, favoriteGraphStat)
// val retweetGraph =
// getSubGraph(realGraphAggregates, _.retweetScore.isDefined, retweetGraphStat)
// val mentionGraph =
// getSubGraph(realGraphAggregates, _.mentionScore.isDefined, mentionGraphStat)
val writeValDataSetExecutions = if (processValueGraphs) {
Seq(
(followGraph, shardingByValue, FollowOutValPath),
(followGraph.swap, shardingByValue, FollowInValPath)
// (mutualFollowGraph, shardingByValue, MutualFollowValPath),
// (favoriteGraph, shardingByValue, FavoriteOutValPath),
// (favoriteGraph.swap, shardingByValue, FavoriteInValPath),
// (retweetGraph, shardingByValue, RetweetOutValPath),
// (retweetGraph.swap, shardingByValue, RetweetInValPath),
// (mentionGraph, shardingByValue, MentionOutValPath),
// (mentionGraph.swap, shardingByValue, MentionInValPath)
)
} else {
Seq.empty
}
val writeKeyDataSetExecutions = if (processKeyGraphs) {
Seq(
(followGraph, shardingByKey, FollowOutKeyPath),
(followGraph.swap, shardingByKey, FollowInKeyPath)
// (favoriteGraph, shardingByKey, FavoriteOutKeyPath),
// (favoriteGraph.swap, shardingByKey, FavoriteInKeyPath),
// (retweetGraph, shardingByKey, RetweetOutKeyPath),
// (retweetGraph.swap, shardingByKey, RetweetInKeyPath),
// (mentionGraph, shardingByKey, MentionOutKeyPath),
// (mentionGraph.swap, shardingByKey, MentionInKeyPath),
// (mutualFollowGraph, shardingByKey, MutualFollowKeyPath)
)
} else {
Seq.empty
}
Execution
.sequence((writeValDataSetExecutions ++ writeKeyDataSetExecutions).map {
case (graph, shardingMethod, path) =>
writeGraphToDB(graph, shardingMethod, path)
}).unit
}
}
}

View File

@ -1,27 +0,0 @@
scala_library(
platform = "java8",
tags = ["bazel-only"],
dependencies = [
"3rdparty/jvm/com/twitter/bijection:core",
"3rdparty/jvm/com/twitter/bijection:scrooge",
"frigate/frigate-common/src/main/scala/com/twitter/frigate/common/constdb_util",
"src/java/com/twitter/ml/api:api-base",
"src/scala/com/twitter/ml/api:api-base",
"src/scala/com/twitter/scalding_internal/job",
"src/scala/com/twitter/scalding_internal/job/analytics_batch",
"src/thrift/com/twitter/ml/api:data-java",
],
)
hadoop_binary(
name = "gfs_random_request-adhoc",
main = "com.twitter.graph_feature_service.scalding.adhoc.RandomRequestGenerationApp",
platform = "java8",
runtime_platform = "java8",
tags = [
"bazel-compatible",
"bazel-compatible:migrated",
"bazel-only",
],
dependencies = [":adhoc"],
)

View File

@ -1,77 +0,0 @@
package com.twitter.graph_feature_service.scalding.adhoc
import com.twitter.bijection.Injection
import com.twitter.frigate.common.constdb_util.Injections
import com.twitter.ml.api.Feature.Discrete
import com.twitter.ml.api.{DailySuffixFeatureSource, DataSetPipe, RichDataRecord}
import com.twitter.scalding._
import com.twitter.scalding_internal.job.TwitterExecutionApp
import java.nio.ByteBuffer
import java.util.TimeZone
object RandomRequestGenerationJob {
implicit val timeZone: TimeZone = DateOps.UTC
implicit val dateParser: DateParser = DateParser.default
val timelineRecapDataSetPath: String =
"/atla/proc2/user/timelines/processed/suggests/recap/data_records"
val USER_ID = new Discrete("meta.user_id")
val AUTHOR_ID = new Discrete("meta.author_id")
val timelineRecapOutPutPath: String = "/user/cassowary/gfs/adhoc/timeline_data"
implicit val inj: Injection[Long, ByteBuffer] = Injections.long2Varint
def run(
dataSetPath: String,
outPutPath: String,
numOfPairsToTake: Int
)(
implicit dateRange: DateRange,
uniqueID: UniqueID
): Execution[Unit] = {
val NumUserAuthorPairs = Stat("NumUserAuthorPairs")
val dataSet: DataSetPipe = DailySuffixFeatureSource(dataSetPath).read
val userAuthorPairs: TypedPipe[(Long, Long)] = dataSet.records.map { record =>
val richRecord = new RichDataRecord(record, dataSet.featureContext)
val userId = richRecord.getFeatureValue(USER_ID)
val authorId = richRecord.getFeatureValue(AUTHOR_ID)
NumUserAuthorPairs.inc()
(userId, authorId)
}
userAuthorPairs
.limit(numOfPairsToTake)
.writeExecution(
TypedTsv[(Long, Long)](outPutPath)
)
}
}
/**
* ./bazel bundle graph-feature-service/src/main/scalding/com/twitter/graph_feature_service/scalding/adhoc:all
*
* oscar hdfs --screen --user cassowary --tee gfs_log --bundle gfs_random_request-adhoc \
--tool com.twitter.graph_feature_service.scalding.adhoc.RandomRequestGenerationApp \
-- --date 2018-08-11 \
--input /atla/proc2/user/timelines/processed/suggests/recap/data_records \
--output /user/cassowary/gfs/adhoc/timeline_data
*/
object RandomRequestGenerationApp extends TwitterExecutionApp {
import RandomRequestGenerationJob._
override def job: Execution[Unit] = Execution.withId { implicit uniqueId =>
Execution.getArgs.flatMap { args: Args =>
implicit val dateRange: DateRange = DateRange.parse(args.list("date"))(timeZone, dateParser)
run(
args.optional("input").getOrElse(timelineRecapDataSetPath),
args.optional("output").getOrElse(timelineRecapOutPutPath),
args.int("num_pairs", 3000)
)
}
}
}

View File

@ -1,15 +0,0 @@
create_thrift_libraries(
base_name = "graph_feature_service_thrift",
sources = ["*.thrift"],
platform = "java8",
tags = ["bazel-compatible"],
generate_languages = [
"java",
# ruby is added due to ruby dependees in timelines
"ruby",
"scala",
"strato",
],
provides_java_name = "graph_feature_service_thrift_java",
provides_scala_name = "graph_feature_service_thrift_scala",
)

View File

@ -1,123 +0,0 @@
namespace java com.twitter.graph_feature_service.thriftjava
#@namespace scala com.twitter.graph_feature_service.thriftscala
#@namespace strato com.twitter.graph_feature_service.thriftscala
// edge type to differentiate different types of graphs (we can also add a lot of other types of edges)
enum EdgeType {
FOLLOWING,
FOLLOWED_BY,
FAVORITE,
FAVORITED_BY,
RETWEET,
RETWEETED_BY,
REPLY,
REPLYED_BY,
MENTION,
MENTIONED_BY,
MUTUAL_FOLLOW,
SIMILAR_TO, // more edge types (like block, report, etc.) can be supported later.
RESERVED_12,
RESERVED_13,
RESERVED_14,
RESERVED_15,
RESERVED_16,
RESERVED_17,
RESERVED_18,
RESERVED_19,
RESERVED_20
}
enum PresetFeatureTypes {
EMPTY,
HTL_TWO_HOP,
WTF_TWO_HOP,
SQ_TWO_HOP,
RUX_TWO_HOP,
MR_TWO_HOP,
USER_TYPEAHEAD_TWO_HOP
}
struct UserWithCount {
1: required i64 userId(personalDataType = 'UserId')
2: required i32 count
}(hasPersonalData = 'true')
struct UserWithScore {
1: required i64 userId(personalDataType = 'UserId')
2: required double score
}(hasPersonalData = 'true')
// Feature Type
// For example, to compute how many of source user's following's have favorited candidate user,
// we need to compute the intersection between source user's FOLLOWING edges, and candidate user's
// FAVORITED_BY edge. In this case, we should user FeatureType(FOLLOWING, FAVORITED_BY)
struct FeatureType {
1: required EdgeType leftEdgeType // edge type from source user
2: required EdgeType rightEdgeType // edge type from candidate user
}(persisted="true")
struct IntersectionValue {
1: required FeatureType featureType
2: optional i32 count
3: optional list<i64> intersectionIds(personalDataType = 'UserId')
4: optional i32 leftNodeDegree
5: optional i32 rightNodeDegree
}(persisted="true", hasPersonalData = 'true')
struct GfsIntersectionResult {
1: required i64 candidateUserId(personalDataType = 'UserId')
2: required list<IntersectionValue> intersectionValues
}(hasPersonalData = 'true')
struct GfsIntersectionRequest {
1: required i64 userId(personalDataType = 'UserId')
2: required list<i64> candidateUserIds(personalDataType = 'UserId')
3: required list<FeatureType> featureTypes
4: optional i32 intersectionIdLimit
}
struct GfsPresetIntersectionRequest {
1: required i64 userId(personalDataType = 'UserId')
2: required list<i64> candidateUserIds(personalDataType = 'UserId')
3: required PresetFeatureTypes presetFeatureTypes
4: optional i32 intersectionIdLimit
}(hasPersonalData = 'true')
struct GfsIntersectionResponse {
1: required list<GfsIntersectionResult> results
}
service Server {
GfsIntersectionResponse getIntersection(1: GfsIntersectionRequest request)
GfsIntersectionResponse getPresetIntersection(1: GfsPresetIntersectionRequest request)
}
###################################################################################################
## For internal usage only
###################################################################################################
struct WorkerIntersectionRequest {
1: required i64 userId(personalDataType = 'UserId')
2: required list<i64> candidateUserIds(personalDataType = 'UserId')
3: required list<FeatureType> featureTypes
4: required PresetFeatureTypes presetFeatureTypes
5: required i32 intersectionIdLimit
}(hasPersonalData = 'true')
struct WorkerIntersectionResponse {
1: required list<list<WorkerIntersectionValue>> results
}
struct WorkerIntersectionValue {
1: i32 count
2: i32 leftNodeDegree
3: i32 rightNodeDegree
4: list<i64> intersectionIds(personalDataType = 'UserId')
}(hasPersonalData = 'true')
struct CachedIntersectionResult {
1: required list<WorkerIntersectionValue> values
}
service Worker {
WorkerIntersectionResponse getIntersection(1: WorkerIntersectionRequest request)
}

View File

@ -1,30 +0,0 @@
jvm_binary(
name = "bin",
basename = "home-mixer",
main = "com.twitter.home_mixer.HomeMixerServerMain",
runtime_platform = "java11",
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/ch/qos/logback:logback-classic",
"finagle/finagle-zipkin-scribe/src/main/scala",
"finatra/inject/inject-logback/src/main/scala",
"home-mixer/server/src/main/scala/com/twitter/home_mixer",
"loglens/loglens-logback/src/main/scala/com/twitter/loglens/logback",
"twitter-server-internal/src/main/scala",
"twitter-server/logback-classic/src/main/scala",
],
)
# Aurora Workflows build phase convention requires a jvm_app named with home-mixer-app
jvm_app(
name = "home-mixer-app",
archive = "zip",
binary = ":bin",
bundles = [
bundle(
fileset = ["config/**/*"],
owning_target = "home-mixer/config:files",
),
],
tags = ["bazel-compatible"],
)

BIN
home-mixer/BUILD.docx Normal file

Binary file not shown.

BIN
home-mixer/README.docx Normal file

Binary file not shown.

View File

@ -1,101 +0,0 @@
Home Mixer
==========
Home Mixer is the main service used to construct and serve Twitter's Home Timelines. It currently
powers:
- For you - best Tweets from people you follow + recommended out-of-network content
- Following - reverse chronological Tweets from people you follow
- Lists - reverse chronological Tweets from List members
Home Mixer is built on Product Mixer, our custom Scala framework that facilitates building
feeds of content.
## Overview
The For You recommendation algorithm in Home Mixer involves the following stages:
- Candidate Generation - fetch Tweets from various Candidate Sources. For example:
- Earlybird Search Index
- User Tweet Entity Graph
- Cr Mixer
- Follow Recommendations Service
- Feature Hydration
- Fetch the ~6000 features needed for ranking
- Scoring and Ranking using ML model
- Filters and Heuristics. For example:
- Author Diversity
- Content Balance (In network vs Out of Network)
- Feedback fatigue
- Deduplication / previously seen Tweets removal
- Visibility Filtering (blocked, muted authors/tweets, NSFW settings)
- Mixing - integrate Tweets with non-Tweet content
- Ads
- Who-to-follow modules
- Prompts
- Product Features and Serving
- Conversation Modules for replies
- Social Context
- Timeline Navigation
- Edited Tweets
- Feedback options
- Pagination and cursoring
- Observability and logging
- Client instructions and content marshalling
## Pipeline Structure
### General
Product Mixer services like Home Mixer are structured around Pipelines that split the execution
into transparent and structured steps.
Requests first go to Product Pipelines, which are used to select which Mixer Pipeline or
Recommendation Pipeline to run for a given request. Each Mixer or Recommendation
Pipeline may run multiple Candidate Pipelines to fetch candidates to include in the response.
Mixer Pipelines combine the results of multiple heterogeneous Candidate Pipelines together
(e.g. ads, tweets, users) while Recommendation Pipelines are used to score (via Scoring Pipelines)
and rank the results of homogenous Candidate Pipelines so that the top ranked ones can be returned.
These pipelines also marshall candidates into a domain object and then into a transport object
to return to the caller.
Candidate Pipelines fetch candidates from underlying Candidate Sources and perform some basic
operations on the Candidates, such as filtering out unwanted candidates, applying decorations,
and hydrating features.
The sections below describe the high level pipeline structure (non-exhaustive) for the main Home
Timeline tabs powered by Home Mixer.
### For You
- ForYouProductPipelineConfig
- ForYouScoredTweetsMixerPipelineConfig (main orchestration layer - mixes Tweets with ads and users)
- ForYouScoredTweetsCandidatePipelineConfig (fetch Tweets)
- ScoredTweetsRecommendationPipelineConfig (main Tweet recommendation layer)
- Fetch Tweet Candidates
- ScoredTweetsInNetworkCandidatePipelineConfig
- ScoredTweetsTweetMixerCandidatePipelineConfig
- ScoredTweetsUtegCandidatePipelineConfig
- ScoredTweetsFrsCandidatePipelineConfig
- Feature Hydration and Scoring
- ScoredTweetsScoringPipelineConfig
- ForYouConversationServiceCandidatePipelineConfig (backup reverse chron pipeline in case Scored Tweets fails)
- ForYouAdsCandidatePipelineConfig (fetch ads)
- ForYouWhoToFollowCandidatePipelineConfig (fetch users to recommend)
### Following
- FollowingProductPipelineConfig
- FollowingMixerPipelineConfig
- FollowingEarlybirdCandidatePipelineConfig (fetch tweets from Search Index)
- ConversationServiceCandidatePipelineConfig (fetch ancestors for conversation modules)
- FollowingAdsCandidatePipelineConfig (fetch ads)
- FollowingWhoToFollowCandidatePipelineConfig (fetch users to recommend)
### Lists
- ListTweetsProductPipelineConfig
- ListTweetsMixerPipelineConfig
- ListTweetsTimelineServiceCandidatePipelineConfig (fetch tweets from timeline service)
- ConversationServiceCandidatePipelineConfig (fetch ancestors for conversation modules)
- ListTweetsAdsCandidatePipelineConfig (fetch ads)

View File

@ -1,51 +0,0 @@
scala_library(
sources = ["*.scala"],
compiler_option_sets = ["fatal_warnings"],
strict_deps = True,
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/com/google/inject:guice",
"3rdparty/jvm/javax/inject:javax.inject",
"3rdparty/jvm/net/codingwell:scala-guice",
"3rdparty/jvm/org/slf4j:slf4j-api",
"finagle/finagle-core/src/main",
"finagle/finagle-http/src/main/scala",
"finagle/finagle-thriftmux/src/main/scala",
"finatra-internal/mtls-http/src/main/scala",
"finatra-internal/mtls-thriftmux/src/main/scala",
"finatra/http-core/src/main/java/com/twitter/finatra/http",
"finatra/inject/inject-app/src/main/java/com/twitter/inject/annotations",
"finatra/inject/inject-app/src/main/scala",
"finatra/inject/inject-core/src/main/scala",
"finatra/inject/inject-server/src/main/scala",
"finatra/inject/inject-utils/src/main/scala",
"home-mixer/server/src/main/resources",
"home-mixer/server/src/main/scala/com/twitter/home_mixer/controller",
"home-mixer/server/src/main/scala/com/twitter/home_mixer/federated",
"home-mixer/server/src/main/scala/com/twitter/home_mixer/module",
"home-mixer/server/src/main/scala/com/twitter/home_mixer/param",
"home-mixer/server/src/main/scala/com/twitter/home_mixer/product",
"home-mixer/thrift/src/main/thrift:thrift-scala",
"product-mixer/component-library/src/main/scala/com/twitter/product_mixer/component_library/module",
"product-mixer/core/src/main/scala/com/twitter/product_mixer/core/controllers",
"product-mixer/core/src/main/scala/com/twitter/product_mixer/core/module",
"product-mixer/core/src/main/scala/com/twitter/product_mixer/core/module/stringcenter",
"product-mixer/core/src/main/thrift/com/twitter/product_mixer/core:thrift-scala",
"src/thrift/com/twitter/timelines/render:thrift-scala",
"strato/config/columns/auth-context:auth-context-strato-client",
"strato/config/columns/gizmoduck:gizmoduck-strato-client",
"strato/src/main/scala/com/twitter/strato/fed",
"strato/src/main/scala/com/twitter/strato/fed/server",
"stringcenter/client",
"stringcenter/client/src/main/java",
"stringcenter/client/src/main/scala/com/twitter/stringcenter/client",
"thrift-web-forms/src/main/scala/com/twitter/thriftwebforms/view",
"timelines/src/main/scala/com/twitter/timelines/config",
"timelines/src/main/scala/com/twitter/timelines/features/app",
"twitter-server-internal",
"twitter-server/server/src/main/scala",
"util/util-app/src/main/scala",
"util/util-core:scala",
"util/util-slf4j-api/src/main/scala",
],
)

View File

@ -1,18 +0,0 @@
package com.twitter.home_mixer
import com.twitter.finatra.http.routing.HttpWarmup
import com.twitter.finatra.httpclient.RequestBuilder._
import com.twitter.util.logging.Logging
import com.twitter.inject.utils.Handler
import com.twitter.util.Try
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
class HomeMixerHttpServerWarmupHandler @Inject() (warmup: HttpWarmup) extends Handler with Logging {
override def handle(): Unit = {
Try(warmup.send(get("/admin/product-mixer/product-pipelines"), admin = true)())
.onFailure(e => error(e.getMessage, e))
}
}

View File

@ -1,128 +0,0 @@
package com.twitter.home_mixer
import com.google.inject.Module
import com.twitter.finagle.Filter
import com.twitter.finatra.annotations.DarkTrafficFilterType
import com.twitter.finatra.http.HttpServer
import com.twitter.finatra.http.routing.HttpRouter
import com.twitter.finatra.mtls.http.{Mtls => HttpMtls}
import com.twitter.finatra.mtls.thriftmux.Mtls
import com.twitter.finatra.mtls.thriftmux.modules.MtlsThriftWebFormsModule
import com.twitter.finatra.thrift.ThriftServer
import com.twitter.finatra.thrift.filters._
import com.twitter.finatra.thrift.routing.ThriftRouter
import com.twitter.home_mixer.controller.HomeThriftController
import com.twitter.home_mixer.federated.HomeMixerColumn
import com.twitter.home_mixer.module._
import com.twitter.home_mixer.param.GlobalParamConfigModule
import com.twitter.home_mixer.product.HomeMixerProductModule
import com.twitter.home_mixer.{thriftscala => st}
import com.twitter.product_mixer.component_library.module.AccountRecommendationsMixerModule
import com.twitter.product_mixer.component_library.module.DarkTrafficFilterModule
import com.twitter.product_mixer.component_library.module.EarlybirdModule
import com.twitter.product_mixer.component_library.module.ExploreRankerClientModule
import com.twitter.product_mixer.component_library.module.GizmoduckClientModule
import com.twitter.product_mixer.component_library.module.OnboardingTaskServiceModule
import com.twitter.product_mixer.component_library.module.SocialGraphServiceModule
import com.twitter.product_mixer.component_library.module.TimelineRankerClientModule
import com.twitter.product_mixer.component_library.module.TimelineScorerClientModule
import com.twitter.product_mixer.component_library.module.TimelineServiceClientModule
import com.twitter.product_mixer.component_library.module.TweetImpressionStoreModule
import com.twitter.product_mixer.component_library.module.TweetMixerClientModule
import com.twitter.product_mixer.component_library.module.UserSessionStoreModule
import com.twitter.product_mixer.core.controllers.ProductMixerController
import com.twitter.product_mixer.core.module.LoggingThrowableExceptionMapper
import com.twitter.product_mixer.core.module.ProductMixerModule
import com.twitter.product_mixer.core.module.stringcenter.ProductScopeStringCenterModule
import com.twitter.strato.fed.StratoFed
import com.twitter.strato.fed.server.StratoFedServer
object HomeMixerServerMain extends HomeMixerServer
class HomeMixerServer
extends StratoFedServer
with ThriftServer
with Mtls
with HttpServer
with HttpMtls {
override val name = "home-mixer-server"
override val modules: Seq[Module] = Seq(
AccountRecommendationsMixerModule,
AdvertiserBrandSafetySettingsStoreModule,
BlenderClientModule,
ClientSentImpressionsPublisherModule,
ConversationServiceModule,
EarlybirdModule,
ExploreRankerClientModule,
FeedbackHistoryClientModule,
GizmoduckClientModule,
GlobalParamConfigModule,
HomeAdsCandidateSourceModule,
HomeMixerFlagsModule,
HomeMixerProductModule,
HomeMixerResourcesModule,
ImpressionBloomFilterModule,
InjectionHistoryClientModule,
ManhattanClientsModule,
ManhattanFeatureRepositoryModule,
ManhattanTweetImpressionStoreModule,
MemcachedFeatureRepositoryModule,
NaviModelClientModule,
OnboardingTaskServiceModule,
OptimizedStratoClientModule,
PeopleDiscoveryServiceModule,
ProductMixerModule,
RealGraphInNetworkScoresModule,
RealtimeAggregateFeatureRepositoryModule,
ScoredTweetsMemcacheModule,
ScribeEventPublisherModule,
SimClustersRecentEngagementsClientModule,
SocialGraphServiceModule,
StaleTweetsCacheModule,
ThriftFeatureRepositoryModule,
TimelineRankerClientModule,
TimelineScorerClientModule,
TimelineServiceClientModule,
TimelinesPersistenceStoreClientModule,
TopicSocialProofClientModule,
TweetImpressionStoreModule,
TweetMixerClientModule,
TweetypieClientModule,
TweetypieStaticEntitiesCacheClientModule,
UserSessionStoreModule,
new DarkTrafficFilterModule[st.HomeMixer.ReqRepServicePerEndpoint](),
new MtlsThriftWebFormsModule[st.HomeMixer.MethodPerEndpoint](this),
new ProductScopeStringCenterModule()
)
override def configureThrift(router: ThriftRouter): Unit = {
router
.filter[LoggingMDCFilter]
.filter[TraceIdMDCFilter]
.filter[ThriftMDCFilter]
.filter[StatsFilter]
.filter[AccessLoggingFilter]
.filter[ExceptionMappingFilter]
.filter[Filter.TypeAgnostic, DarkTrafficFilterType]
.exceptionMapper[LoggingThrowableExceptionMapper]
.exceptionMapper[PipelineFailureExceptionMapper]
.add[HomeThriftController]
}
override def configureHttp(router: HttpRouter): Unit =
router.add(
ProductMixerController[st.HomeMixer.MethodPerEndpoint](
this.injector,
st.HomeMixer.ExecutePipeline))
override val dest: String = "/s/home-mixer/home-mixer:strato"
override val columns: Seq[Class[_ <: StratoFed.Column]] =
Seq(classOf[HomeMixerColumn])
override protected def warmup(): Unit = {
handle[HomeMixerThriftServerWarmupHandler]()
handle[HomeMixerHttpServerWarmupHandler]()
}
}

View File

@ -1,73 +0,0 @@
package com.twitter.home_mixer
import com.twitter.finagle.thrift.ClientId
import com.twitter.finatra.thrift.routing.ThriftWarmup
import com.twitter.home_mixer.{thriftscala => st}
import com.twitter.util.logging.Logging
import com.twitter.inject.utils.Handler
import com.twitter.product_mixer.core.{thriftscala => pt}
import com.twitter.scrooge.Request
import com.twitter.scrooge.Response
import com.twitter.util.Return
import com.twitter.util.Throw
import com.twitter.util.Try
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
class HomeMixerThriftServerWarmupHandler @Inject() (warmup: ThriftWarmup)
extends Handler
with Logging {
private val clientId = ClientId("thrift-warmup-client")
def handle(): Unit = {
val testIds = Seq(1, 2, 3)
try {
clientId.asCurrent {
testIds.foreach { id =>
val warmupReq = warmupQuery(id)
info(s"Sending warm-up request to service with query: $warmupReq")
warmup.sendRequest(
method = st.HomeMixer.GetUrtResponse,
req = Request(st.HomeMixer.GetUrtResponse.Args(warmupReq)))(assertWarmupResponse)
}
}
} catch {
case e: Throwable => error(e.getMessage, e)
}
info("Warm-up done.")
}
private def warmupQuery(userId: Long): st.HomeMixerRequest = {
val clientContext = pt.ClientContext(
userId = Some(userId),
guestId = None,
appId = Some(12345L),
ipAddress = Some("0.0.0.0"),
userAgent = Some("FAKE_USER_AGENT_FOR_WARMUPS"),
countryCode = Some("US"),
languageCode = Some("en"),
isTwoffice = None,
userRoles = None,
deviceId = Some("FAKE_DEVICE_ID_FOR_WARMUPS")
)
st.HomeMixerRequest(
clientContext = clientContext,
product = st.Product.Following,
productContext = Some(st.ProductContext.Following(st.Following())),
maxResults = Some(3)
)
}
private def assertWarmupResponse(
result: Try[Response[st.HomeMixer.GetUrtResponse.SuccessType]]
): Unit = {
result match {
case Return(_) => // ok
case Throw(exception) =>
warn("Error performing warm-up request.")
error(exception.getMessage, exception)
}
}
}

View File

@ -1,26 +0,0 @@
scala_library(
sources = ["*.scala"],
compiler_option_sets = ["fatal_warnings"],
strict_deps = True,
tags = ["bazel-compatible"],
dependencies = [
"home-mixer/server/src/main/scala/com/twitter/home_mixer/functional_component/candidate_source",
"home-mixer/server/src/main/scala/com/twitter/home_mixer/functional_component/decorator/urt/builder",
"home-mixer/server/src/main/scala/com/twitter/home_mixer/functional_component/feature_hydrator",
"home-mixer/server/src/main/scala/com/twitter/home_mixer/functional_component/filter",
"home-mixer/server/src/main/scala/com/twitter/home_mixer/functional_component/gate",
"home-mixer/server/src/main/scala/com/twitter/home_mixer/functional_component/query_transformer",
"home-mixer/server/src/main/scala/com/twitter/home_mixer/model",
"home-mixer/server/src/main/scala/com/twitter/home_mixer/model/request",
"home-mixer/server/src/main/scala/com/twitter/home_mixer/service",
"product-mixer/component-library/src/main/scala/com/twitter/product_mixer/component_library/candidate_source/tweetconvosvc",
"product-mixer/component-library/src/main/scala/com/twitter/product_mixer/component_library/decorator/urt",
"product-mixer/component-library/src/main/scala/com/twitter/product_mixer/component_library/filter",
"product-mixer/component-library/src/main/scala/com/twitter/product_mixer/component_library/gate",
"product-mixer/component-library/src/main/scala/com/twitter/product_mixer/component_library/model/candidate",
"product-mixer/core/src/main/scala/com/twitter/product_mixer/core/functional_component/transformer",
],
exports = [
"product-mixer/component-library/src/main/scala/com/twitter/product_mixer/component_library/candidate_source/tweetconvosvc",
],
)

View File

@ -1,116 +0,0 @@
package com.twitter.home_mixer.candidate_pipeline
import com.twitter.home_mixer.functional_component.feature_hydrator.InNetworkFeatureHydrator
import com.twitter.home_mixer.functional_component.feature_hydrator.NamesFeatureHydrator
import com.twitter.home_mixer.functional_component.feature_hydrator.TweetypieFeatureHydrator
import com.twitter.home_mixer.functional_component.filter.InvalidConversationModuleFilter
import com.twitter.home_mixer.functional_component.filter.InvalidSubscriptionTweetFilter
import com.twitter.home_mixer.functional_component.filter.RetweetDeduplicationFilter
import com.twitter.home_mixer.model.HomeFeatures.AuthorIdFeature
import com.twitter.home_mixer.model.HomeFeatures.InReplyToTweetIdFeature
import com.twitter.home_mixer.model.HomeFeatures.IsHydratedFeature
import com.twitter.home_mixer.model.HomeFeatures.QuotedTweetDroppedFeature
import com.twitter.home_mixer.model.HomeFeatures.SourceTweetIdFeature
import com.twitter.home_mixer.model.HomeFeatures.SourceUserIdFeature
import com.twitter.home_mixer.service.HomeMixerAlertConfig
import com.twitter.product_mixer.component_library.candidate_source.tweetconvosvc.ConversationServiceCandidateSource
import com.twitter.product_mixer.component_library.candidate_source.tweetconvosvc.ConversationServiceCandidateSourceRequest
import com.twitter.product_mixer.component_library.candidate_source.tweetconvosvc.TweetWithConversationMetadata
import com.twitter.product_mixer.component_library.filter.FeatureFilter
import com.twitter.product_mixer.component_library.filter.PredicateFeatureFilter
import com.twitter.product_mixer.component_library.model.candidate.TweetCandidate
import com.twitter.product_mixer.core.functional_component.candidate_source.BaseCandidateSource
import com.twitter.product_mixer.core.functional_component.decorator.CandidateDecorator
import com.twitter.product_mixer.core.functional_component.feature_hydrator.BaseCandidateFeatureHydrator
import com.twitter.product_mixer.core.functional_component.filter.Filter
import com.twitter.product_mixer.core.functional_component.gate.BaseGate
import com.twitter.product_mixer.core.functional_component.transformer.CandidateFeatureTransformer
import com.twitter.product_mixer.core.functional_component.transformer.CandidatePipelineResultsTransformer
import com.twitter.product_mixer.core.functional_component.transformer.DependentCandidatePipelineQueryTransformer
import com.twitter.product_mixer.core.model.common.identifier.CandidatePipelineIdentifier
import com.twitter.product_mixer.core.model.common.identifier.FilterIdentifier
import com.twitter.product_mixer.core.pipeline.PipelineQuery
import com.twitter.product_mixer.core.pipeline.candidate.DependentCandidatePipelineConfig
/**
* Candidate Pipeline Config that fetches tweets from the Conversation Service Candidate Source
*/
class ConversationServiceCandidatePipelineConfig[Query <: PipelineQuery](
conversationServiceCandidateSource: ConversationServiceCandidateSource,
tweetypieFeatureHydrator: TweetypieFeatureHydrator,
namesFeatureHydrator: NamesFeatureHydrator,
invalidSubscriptionTweetFilter: InvalidSubscriptionTweetFilter,
override val gates: Seq[BaseGate[Query]],
override val decorator: Option[CandidateDecorator[Query, TweetCandidate]])
extends DependentCandidatePipelineConfig[
Query,
ConversationServiceCandidateSourceRequest,
TweetWithConversationMetadata,
TweetCandidate
] {
override val identifier: CandidatePipelineIdentifier =
CandidatePipelineIdentifier("ConversationService")
private val TweetypieHydratedFilterId = "TweetypieHydrated"
private val QuotedTweetDroppedFilterId = "QuotedTweetDropped"
override val candidateSource: BaseCandidateSource[
ConversationServiceCandidateSourceRequest,
TweetWithConversationMetadata
] = conversationServiceCandidateSource
override val queryTransformer: DependentCandidatePipelineQueryTransformer[
Query,
ConversationServiceCandidateSourceRequest
] = { (_, candidates) =>
val tweetsWithConversationMetadata = candidates.map { candidate =>
TweetWithConversationMetadata(
tweetId = candidate.candidateIdLong,
userId = candidate.features.getOrElse(AuthorIdFeature, None),
sourceTweetId = candidate.features.getOrElse(SourceTweetIdFeature, None),
sourceUserId = candidate.features.getOrElse(SourceUserIdFeature, None),
inReplyToTweetId = candidate.features.getOrElse(InReplyToTweetIdFeature, None),
conversationId = None,
ancestors = Seq.empty
)
}
ConversationServiceCandidateSourceRequest(tweetsWithConversationMetadata)
}
override val featuresFromCandidateSourceTransformers: Seq[
CandidateFeatureTransformer[TweetWithConversationMetadata]
] = Seq(ConversationServiceResponseFeatureTransformer)
override val resultTransformer: CandidatePipelineResultsTransformer[
TweetWithConversationMetadata,
TweetCandidate
] = { sourceResult => TweetCandidate(id = sourceResult.tweetId) }
override val preFilterFeatureHydrationPhase1: Seq[
BaseCandidateFeatureHydrator[Query, TweetCandidate, _]
] = Seq(
tweetypieFeatureHydrator,
InNetworkFeatureHydrator,
)
override def filters: Seq[Filter[Query, TweetCandidate]] = Seq(
RetweetDeduplicationFilter,
FeatureFilter.fromFeature(FilterIdentifier(TweetypieHydratedFilterId), IsHydratedFeature),
PredicateFeatureFilter.fromPredicate(
FilterIdentifier(QuotedTweetDroppedFilterId),
shouldKeepCandidate = { features => !features.getOrElse(QuotedTweetDroppedFeature, false) }
),
invalidSubscriptionTweetFilter,
InvalidConversationModuleFilter
)
override val postFilterFeatureHydration: Seq[
BaseCandidateFeatureHydrator[Query, TweetCandidate, _]
] = Seq(namesFeatureHydrator)
override val alerts = Seq(
HomeMixerAlertConfig.BusinessHours.defaultSuccessRateAlert(),
HomeMixerAlertConfig.BusinessHours.defaultEmptyResponseRateAlert()
)
}

View File

@ -1,34 +0,0 @@
package com.twitter.home_mixer.candidate_pipeline
import com.twitter.home_mixer.functional_component.feature_hydrator.NamesFeatureHydrator
import com.twitter.home_mixer.functional_component.feature_hydrator.TweetypieFeatureHydrator
import com.twitter.home_mixer.functional_component.filter.InvalidSubscriptionTweetFilter
import com.twitter.product_mixer.component_library.candidate_source.tweetconvosvc.ConversationServiceCandidateSource
import com.twitter.product_mixer.component_library.model.candidate.TweetCandidate
import com.twitter.product_mixer.core.functional_component.decorator.CandidateDecorator
import com.twitter.product_mixer.core.functional_component.gate.BaseGate
import com.twitter.product_mixer.core.pipeline.PipelineQuery
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
class ConversationServiceCandidatePipelineConfigBuilder[Query <: PipelineQuery] @Inject() (
conversationServiceCandidateSource: ConversationServiceCandidateSource,
tweetypieFeatureHydrator: TweetypieFeatureHydrator,
invalidSubscriptionTweetFilter: InvalidSubscriptionTweetFilter,
namesFeatureHydrator: NamesFeatureHydrator) {
def build(
gates: Seq[BaseGate[Query]] = Seq.empty,
decorator: Option[CandidateDecorator[Query, TweetCandidate]] = None
): ConversationServiceCandidatePipelineConfig[Query] = {
new ConversationServiceCandidatePipelineConfig(
conversationServiceCandidateSource,
tweetypieFeatureHydrator,
namesFeatureHydrator,
invalidSubscriptionTweetFilter,
gates,
decorator
)
}
}

View File

@ -1,39 +0,0 @@
package com.twitter.home_mixer.candidate_pipeline
import com.twitter.home_mixer.model.HomeFeatures._
import com.twitter.product_mixer.component_library.candidate_source.tweetconvosvc.TweetWithConversationMetadata
import com.twitter.product_mixer.core.feature.Feature
import com.twitter.product_mixer.core.feature.featuremap.FeatureMap
import com.twitter.product_mixer.core.feature.featuremap.FeatureMapBuilder
import com.twitter.product_mixer.core.functional_component.transformer.CandidateFeatureTransformer
import com.twitter.product_mixer.core.model.common.identifier.TransformerIdentifier
import com.twitter.timelineservice.suggests.thriftscala.SuggestType
object ConversationServiceResponseFeatureTransformer
extends CandidateFeatureTransformer[TweetWithConversationMetadata] {
override val identifier: TransformerIdentifier =
TransformerIdentifier("ConversationServiceResponse")
override val features: Set[Feature[_, _]] = Set(
AuthorIdFeature,
InReplyToTweetIdFeature,
IsRetweetFeature,
SourceTweetIdFeature,
SourceUserIdFeature,
ConversationModuleFocalTweetIdFeature,
AncestorsFeature,
SuggestTypeFeature
)
override def transform(candidate: TweetWithConversationMetadata): FeatureMap = FeatureMapBuilder()
.add(AuthorIdFeature, candidate.userId)
.add(InReplyToTweetIdFeature, candidate.inReplyToTweetId)
.add(IsRetweetFeature, candidate.sourceTweetId.isDefined)
.add(SourceTweetIdFeature, candidate.sourceTweetId)
.add(SourceUserIdFeature, candidate.sourceUserId)
.add(ConversationModuleFocalTweetIdFeature, candidate.conversationId)
.add(AncestorsFeature, candidate.ancestors)
.add(SuggestTypeFeature, Some(SuggestType.RankedOrganicTweet))
.build()
}

View File

@ -1,84 +0,0 @@
package com.twitter.home_mixer.candidate_pipeline
import com.twitter.home_mixer.functional_component.candidate_source.StaleTweetsCacheCandidateSource
import com.twitter.home_mixer.functional_component.decorator.urt.builder.HomeFeedbackActionInfoBuilder
import com.twitter.home_mixer.functional_component.feature_hydrator.NamesFeatureHydrator
import com.twitter.home_mixer.functional_component.query_transformer.EditedTweetsCandidatePipelineQueryTransformer
import com.twitter.home_mixer.service.HomeMixerAlertConfig
import com.twitter.product_mixer.component_library.decorator.urt.UrtItemCandidateDecorator
import com.twitter.product_mixer.component_library.decorator.urt.builder.contextual_ref.ContextualTweetRefBuilder
import com.twitter.product_mixer.component_library.decorator.urt.builder.item.tweet.TweetCandidateUrtItemBuilder
import com.twitter.product_mixer.component_library.decorator.urt.builder.metadata.EmptyClientEventInfoBuilder
import com.twitter.product_mixer.component_library.model.candidate.TweetCandidate
import com.twitter.product_mixer.core.functional_component.candidate_source.BaseCandidateSource
import com.twitter.product_mixer.core.functional_component.decorator.CandidateDecorator
import com.twitter.product_mixer.core.functional_component.feature_hydrator.BaseCandidateFeatureHydrator
import com.twitter.product_mixer.core.functional_component.transformer.CandidatePipelineQueryTransformer
import com.twitter.product_mixer.core.functional_component.transformer.CandidatePipelineResultsTransformer
import com.twitter.product_mixer.core.model.common.identifier.CandidatePipelineIdentifier
import com.twitter.product_mixer.core.model.marshalling.response.rtf.safety_level.TimelineFocalTweetSafetyLevel
import com.twitter.product_mixer.core.model.marshalling.response.urt.contextual_ref.TweetHydrationContext
import com.twitter.product_mixer.core.model.marshalling.response.urt.item.tweet.TweetItem
import com.twitter.product_mixer.core.pipeline.PipelineQuery
import com.twitter.product_mixer.core.pipeline.candidate.DependentCandidatePipelineConfig
import javax.inject.Inject
import javax.inject.Singleton
/**
* Candidate Pipeline Config that fetches edited tweets from the Stale Tweets Cache
*/
@Singleton
case class EditedTweetsCandidatePipelineConfig @Inject() (
staleTweetsCacheCandidateSource: StaleTweetsCacheCandidateSource,
namesFeatureHydrator: NamesFeatureHydrator,
homeFeedbackActionInfoBuilder: HomeFeedbackActionInfoBuilder)
extends DependentCandidatePipelineConfig[
PipelineQuery,
Seq[Long],
Long,
TweetCandidate
] {
override val identifier: CandidatePipelineIdentifier = CandidatePipelineIdentifier("EditedTweets")
override val candidateSource: BaseCandidateSource[Seq[Long], Long] =
staleTweetsCacheCandidateSource
override val queryTransformer: CandidatePipelineQueryTransformer[
PipelineQuery,
Seq[Long]
] = EditedTweetsCandidatePipelineQueryTransformer
override val resultTransformer: CandidatePipelineResultsTransformer[
Long,
TweetCandidate
] = { candidate => TweetCandidate(id = candidate) }
override val postFilterFeatureHydration: Seq[
BaseCandidateFeatureHydrator[PipelineQuery, TweetCandidate, _]
] = Seq(namesFeatureHydrator)
override val decorator: Option[CandidateDecorator[PipelineQuery, TweetCandidate]] = {
val tweetItemBuilder = TweetCandidateUrtItemBuilder[PipelineQuery, TweetCandidate](
clientEventInfoBuilder = EmptyClientEventInfoBuilder,
entryIdToReplaceBuilder = Some((_, candidate, _) =>
Some(s"${TweetItem.TweetEntryNamespace}-${candidate.id.toString}")),
contextualTweetRefBuilder = Some(
ContextualTweetRefBuilder(
TweetHydrationContext(
// Apply safety level that includes canonical VF treatments that apply regardless of context.
safetyLevelOverride = Some(TimelineFocalTweetSafetyLevel),
outerTweetContext = None
)
)
),
feedbackActionInfoBuilder = Some(homeFeedbackActionInfoBuilder)
)
Some(UrtItemCandidateDecorator(tweetItemBuilder))
}
override val alerts = Seq(
HomeMixerAlertConfig.BusinessHours.defaultSuccessRateAlert(99.5, 50, 60, 60)
)
}

View File

@ -1,123 +0,0 @@
package com.twitter.home_mixer.candidate_pipeline
import com.twitter.conversions.DurationOps._
import com.twitter.home_mixer.functional_component.gate.RequestContextNotGate
import com.twitter.home_mixer.model.HomeFeatures.GetNewerFeature
import com.twitter.home_mixer.model.request.DeviceContext
import com.twitter.home_mixer.model.request.HasDeviceContext
import com.twitter.home_mixer.service.HomeMixerAlertConfig
import com.twitter.product_mixer.component_library.decorator.urt.UrtItemCandidateDecorator
import com.twitter.product_mixer.component_library.decorator.urt.builder.item.alert.DurationParamBuilder
import com.twitter.product_mixer.component_library.decorator.urt.builder.item.alert.ShowAlertCandidateUrtItemBuilder
import com.twitter.product_mixer.component_library.decorator.urt.builder.item.alert.StaticShowAlertColorConfigurationBuilder
import com.twitter.product_mixer.component_library.decorator.urt.builder.item.alert.StaticShowAlertDisplayLocationBuilder
import com.twitter.product_mixer.component_library.decorator.urt.builder.item.alert.StaticShowAlertIconDisplayInfoBuilder
import com.twitter.product_mixer.component_library.gate.FeatureGate
import com.twitter.product_mixer.component_library.model.candidate.ShowAlertCandidate
import com.twitter.product_mixer.core.feature.featuremap.FeatureMap
import com.twitter.product_mixer.core.functional_component.candidate_source.CandidateSource
import com.twitter.product_mixer.core.functional_component.candidate_source.StaticCandidateSource
import com.twitter.product_mixer.core.functional_component.configapi.StaticParam
import com.twitter.product_mixer.core.functional_component.decorator.CandidateDecorator
import com.twitter.product_mixer.core.functional_component.decorator.urt.builder.item.alert.BaseDurationBuilder
import com.twitter.product_mixer.core.functional_component.gate.Gate
import com.twitter.product_mixer.core.functional_component.transformer.CandidatePipelineQueryTransformer
import com.twitter.product_mixer.core.functional_component.transformer.CandidatePipelineResultsTransformer
import com.twitter.product_mixer.core.model.common.identifier.CandidatePipelineIdentifier
import com.twitter.product_mixer.core.model.common.identifier.CandidateSourceIdentifier
import com.twitter.product_mixer.core.model.marshalling.response.urt.alert.NewTweets
import com.twitter.product_mixer.core.model.marshalling.response.urt.alert.ShowAlertColorConfiguration
import com.twitter.product_mixer.core.model.marshalling.response.urt.alert.ShowAlertIconDisplayInfo
import com.twitter.product_mixer.core.model.marshalling.response.urt.alert.Top
import com.twitter.product_mixer.core.model.marshalling.response.urt.alert.UpArrow
import com.twitter.product_mixer.core.model.marshalling.response.urt.color.TwitterBlueRosettaColor
import com.twitter.product_mixer.core.model.marshalling.response.urt.color.WhiteRosettaColor
import com.twitter.product_mixer.core.pipeline.PipelineQuery
import com.twitter.product_mixer.core.pipeline.candidate.DependentCandidatePipelineConfig
import com.twitter.util.Duration
import javax.inject.Inject
import javax.inject.Singleton
/**
* Candidate Pipeline Config that creates the New Tweets Pill
*/
@Singleton
class NewTweetsPillCandidatePipelineConfig[Query <: PipelineQuery with HasDeviceContext] @Inject() (
) extends DependentCandidatePipelineConfig[
Query,
Unit,
ShowAlertCandidate,
ShowAlertCandidate
] {
import NewTweetsPillCandidatePipelineConfig._
override val identifier: CandidatePipelineIdentifier =
CandidatePipelineIdentifier("NewTweetsPill")
override val gates: Seq[Gate[Query]] = Seq(
RequestContextNotGate(Seq(DeviceContext.RequestContext.PullToRefresh)),
FeatureGate.fromFeature(GetNewerFeature)
)
override val candidateSource: CandidateSource[Unit, ShowAlertCandidate] =
StaticCandidateSource(
CandidateSourceIdentifier(identifier.name),
Seq(ShowAlertCandidate(id = identifier.name, userIds = Seq.empty))
)
override val queryTransformer: CandidatePipelineQueryTransformer[Query, Unit] = { _ => Unit }
override val resultTransformer: CandidatePipelineResultsTransformer[
ShowAlertCandidate,
ShowAlertCandidate
] = { candidate => candidate }
override val decorator: Option[CandidateDecorator[Query, ShowAlertCandidate]] = {
val triggerDelayBuilder = new BaseDurationBuilder[Query] {
override def apply(
query: Query,
candidate: ShowAlertCandidate,
features: FeatureMap
): Option[Duration] = {
val delay = query.deviceContext.flatMap(_.requestContextValue) match {
case Some(DeviceContext.RequestContext.TweetSelfThread) => 0.millis
case Some(DeviceContext.RequestContext.ManualRefresh) => 0.millis
case _ => TriggerDelay
}
Some(delay)
}
}
val homeShowAlertCandidateBuilder = ShowAlertCandidateUrtItemBuilder(
alertType = NewTweets,
colorConfigBuilder = StaticShowAlertColorConfigurationBuilder(DefaultColorConfig),
displayLocationBuilder = StaticShowAlertDisplayLocationBuilder(Top),
triggerDelayBuilder = Some(triggerDelayBuilder),
displayDurationBuilder = Some(DurationParamBuilder(StaticParam(DisplayDuration))),
iconDisplayInfoBuilder = Some(StaticShowAlertIconDisplayInfoBuilder(DefaultIconDisplayInfo))
)
Some(UrtItemCandidateDecorator(homeShowAlertCandidateBuilder))
}
override val alerts = Seq(
HomeMixerAlertConfig.BusinessHours.defaultSuccessRateAlert(),
HomeMixerAlertConfig.BusinessHours.defaultEmptyResponseRateAlert()
)
}
object NewTweetsPillCandidatePipelineConfig {
val DefaultColorConfig: ShowAlertColorConfiguration = ShowAlertColorConfiguration(
background = TwitterBlueRosettaColor,
text = WhiteRosettaColor,
border = Some(WhiteRosettaColor)
)
val DefaultIconDisplayInfo: ShowAlertIconDisplayInfo =
ShowAlertIconDisplayInfo(icon = UpArrow, tint = WhiteRosettaColor)
// Unlimited display time (until user takes action)
val DisplayDuration = -1.millisecond
val TriggerDelay = 4.minutes
}

View File

@ -1,34 +0,0 @@
package com.twitter.home_mixer.candidate_pipeline
import com.twitter.home_mixer.model.HomeFeatures.AuthorIdFeature
import com.twitter.home_mixer.model.HomeFeatures.InReplyToTweetIdFeature
import com.twitter.home_mixer.model.HomeFeatures.IsRetweetFeature
import com.twitter.home_mixer.model.HomeFeatures.SourceTweetIdFeature
import com.twitter.home_mixer.model.HomeFeatures.SourceUserIdFeature
import com.twitter.product_mixer.core.feature.Feature
import com.twitter.product_mixer.core.feature.featuremap.FeatureMap
import com.twitter.product_mixer.core.feature.featuremap.FeatureMapBuilder
import com.twitter.product_mixer.core.functional_component.transformer.CandidateFeatureTransformer
import com.twitter.product_mixer.core.model.common.identifier.TransformerIdentifier
import com.twitter.timelineservice.{thriftscala => t}
object TimelineServiceResponseFeatureTransformer extends CandidateFeatureTransformer[t.Tweet] {
override val identifier: TransformerIdentifier = TransformerIdentifier("TimelineServiceResponse")
override val features: Set[Feature[_, _]] = Set(
AuthorIdFeature,
InReplyToTweetIdFeature,
IsRetweetFeature,
SourceTweetIdFeature,
SourceUserIdFeature,
)
override def transform(candidate: t.Tweet): FeatureMap = FeatureMapBuilder()
.add(AuthorIdFeature, candidate.userId)
.add(InReplyToTweetIdFeature, candidate.inReplyToStatusId)
.add(IsRetweetFeature, candidate.sourceStatusId.isDefined)
.add(SourceTweetIdFeature, candidate.sourceStatusId)
.add(SourceUserIdFeature, candidate.sourceUserId)
.build()
}

View File

@ -1,17 +0,0 @@
scala_library(
sources = ["*.scala"],
compiler_option_sets = ["fatal_warnings"],
strict_deps = True,
tags = ["bazel-compatible"],
dependencies = [
"finatra/thrift/src/main/scala/com/twitter/finatra/thrift:controller",
"home-mixer/server/src/main/scala/com/twitter/home_mixer/marshaller/request",
"home-mixer/server/src/main/scala/com/twitter/home_mixer/service",
"home-mixer/thrift/src/main/thrift:thrift-scala",
"product-mixer/core/src/main/scala/com/twitter/product_mixer/core/controllers",
"product-mixer/core/src/main/scala/com/twitter/product_mixer/core/functional_component/configapi",
"product-mixer/core/src/main/scala/com/twitter/product_mixer/core/service/debug_query",
"product-mixer/core/src/main/scala/com/twitter/product_mixer/core/service/urt",
"snowflake/src/main/scala/com/twitter/snowflake/id",
],
)

View File

@ -1,51 +0,0 @@
package com.twitter.home_mixer.controller
import com.twitter.finatra.thrift.Controller
import com.twitter.home_mixer.marshaller.request.HomeMixerRequestUnmarshaller
import com.twitter.home_mixer.model.request.HomeMixerRequest
import com.twitter.home_mixer.service.ScoredTweetsService
import com.twitter.home_mixer.{thriftscala => t}
import com.twitter.product_mixer.core.controllers.DebugTwitterContext
import com.twitter.product_mixer.core.functional_component.configapi.ParamsBuilder
import com.twitter.product_mixer.core.service.debug_query.DebugQueryService
import com.twitter.product_mixer.core.service.urt.UrtService
import com.twitter.snowflake.id.SnowflakeId
import com.twitter.stitch.Stitch
import com.twitter.timelines.configapi.Params
import javax.inject.Inject
class HomeThriftController @Inject() (
homeRequestUnmarshaller: HomeMixerRequestUnmarshaller,
urtService: UrtService,
scoredTweetsService: ScoredTweetsService,
paramsBuilder: ParamsBuilder)
extends Controller(t.HomeMixer)
with DebugTwitterContext {
handle(t.HomeMixer.GetUrtResponse) { args: t.HomeMixer.GetUrtResponse.Args =>
val request = homeRequestUnmarshaller(args.request)
val params = buildParams(request)
Stitch.run(urtService.getUrtResponse[HomeMixerRequest](request, params))
}
handle(t.HomeMixer.GetScoredTweetsResponse) { args: t.HomeMixer.GetScoredTweetsResponse.Args =>
val request = homeRequestUnmarshaller(args.request)
val params = buildParams(request)
withDebugTwitterContext(request.clientContext) {
Stitch.run(scoredTweetsService.getScoredTweetsResponse[HomeMixerRequest](request, params))
}
}
private def buildParams(request: HomeMixerRequest): Params = {
val userAgeOpt = request.clientContext.userId.map { userId =>
SnowflakeId.timeFromIdOpt(userId).map(_.untilNow.inDays).getOrElse(Int.MaxValue)
}
val fsCustomMapInput = userAgeOpt.map("account_age_in_days" -> _).toMap
paramsBuilder.build(
clientContext = request.clientContext,
product = request.product,
featureOverrides = request.debugParams.flatMap(_.featureOverrides).getOrElse(Map.empty),
fsCustomMapInput = fsCustomMapInput
)
}
}

View File

@ -1,24 +0,0 @@
scala_library(
sources = ["*.scala"],
compiler_option_sets = ["fatal_warnings"],
strict_deps = True,
tags = ["bazel-compatible"],
dependencies = [
"home-mixer/server/src/main/scala/com/twitter/home_mixer/marshaller/request",
"home-mixer/server/src/main/scala/com/twitter/home_mixer/model/request",
"home-mixer/thrift/src/main/thrift:thrift-scala",
"product-mixer/core/src/main/scala/com/twitter/product_mixer/core/functional_component/configapi",
"product-mixer/core/src/main/scala/com/twitter/product_mixer/core/pipeline/product",
"product-mixer/core/src/main/scala/com/twitter/product_mixer/core/product/registry",
"product-mixer/core/src/main/thrift/com/twitter/product_mixer/core:thrift-scala",
"src/thrift/com/twitter/gizmoduck:thrift-scala",
"src/thrift/com/twitter/timelines/render:thrift-scala",
"stitch/stitch-repo/src/main/scala",
"strato/config/columns/auth-context:auth-context-strato-client",
"strato/config/columns/gizmoduck:gizmoduck-strato-client",
"strato/config/src/thrift/com/twitter/strato/graphql/timelines:graphql-timelines-scala",
"strato/src/main/scala/com/twitter/strato/callcontext",
"strato/src/main/scala/com/twitter/strato/fed",
"strato/src/main/scala/com/twitter/strato/fed/server",
],
)

View File

@ -1,217 +0,0 @@
package com.twitter.home_mixer.federated
import com.twitter.gizmoduck.{thriftscala => gd}
import com.twitter.home_mixer.marshaller.request.HomeMixerRequestUnmarshaller
import com.twitter.home_mixer.model.request.HomeMixerRequest
import com.twitter.home_mixer.{thriftscala => hm}
import com.twitter.product_mixer.core.functional_component.configapi.ParamsBuilder
import com.twitter.product_mixer.core.pipeline.product.ProductPipelineRequest
import com.twitter.product_mixer.core.pipeline.product.ProductPipelineResult
import com.twitter.product_mixer.core.product.registry.ProductPipelineRegistry
import com.twitter.product_mixer.core.{thriftscala => pm}
import com.twitter.stitch.Arrow
import com.twitter.stitch.Stitch
import com.twitter.strato.callcontext.CallContext
import com.twitter.strato.catalog.OpMetadata
import com.twitter.strato.config._
import com.twitter.strato.data._
import com.twitter.strato.fed.StratoFed
import com.twitter.strato.generated.client.auth_context.AuditIpClientColumn
import com.twitter.strato.generated.client.gizmoduck.CompositeOnUserClientColumn
import com.twitter.strato.graphql.timelines.{thriftscala => gql}
import com.twitter.strato.thrift.ScroogeConv
import com.twitter.timelines.render.{thriftscala => tr}
import com.twitter.util.Try
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
class HomeMixerColumn @Inject() (
homeMixerRequestUnmarshaller: HomeMixerRequestUnmarshaller,
compositeOnUserClientColumn: CompositeOnUserClientColumn,
auditIpClientColumn: AuditIpClientColumn,
paramsBuilder: ParamsBuilder,
productPipelineRegistry: ProductPipelineRegistry)
extends StratoFed.Column(HomeMixerColumn.Path)
with StratoFed.Fetch.Arrow {
override val contactInfo: ContactInfo = ContactInfo(
contactEmail = "",
ldapGroup = "",
slackRoomId = ""
)
override val metadata: OpMetadata =
OpMetadata(
lifecycle = Some(Lifecycle.Production),
description =
Some(Description.PlainText("Federated Strato column for Timelines served via Home Mixer"))
)
private val bouncerAccess: Seq[Policy] = Seq(BouncerAccess())
private val finatraTestServiceIdentifiers: Seq[Policy] = Seq(
ServiceIdentifierPattern(
role = "",
service = "",
env = "",
zone = Seq(""))
)
override val policy: Policy = AnyOf(bouncerAccess ++ finatraTestServiceIdentifiers)
override type Key = gql.TimelineKey
override type View = gql.HomeTimelineView
override type Value = tr.Timeline
override val keyConv: Conv[Key] = ScroogeConv.fromStruct[gql.TimelineKey]
override val viewConv: Conv[View] = ScroogeConv.fromStruct[gql.HomeTimelineView]
override val valueConv: Conv[Value] = ScroogeConv.fromStruct[tr.Timeline]
private def createHomeMixerRequestArrow(
compositeOnUserClientColumn: CompositeOnUserClientColumn,
auditIpClientColumn: AuditIpClientColumn
): Arrow[(Key, View), hm.HomeMixerRequest] = {
val populateUserRolesAndIp: Arrow[(Key, View), (Option[Set[String]], Option[String])] = {
val gizmoduckView: (gd.LookupContext, Set[gd.QueryFields]) =
(gd.LookupContext(), Set(gd.QueryFields.Roles))
val populateUserRoles = Arrow
.flatMap[(Key, View), Option[Set[String]]] { _ =>
Stitch.collect {
CallContext.twitterUserId.map { userId =>
compositeOnUserClientColumn.fetcher
.callStack(HomeMixerColumn.FetchCallstack)
.fetch(userId, gizmoduckView).map(_.v)
.map {
_.flatMap(_.roles.map(_.roles.toSet)).getOrElse(Set.empty)
}
}
}
}
val populateIpAddress = Arrow
.flatMap[(Key, View), Option[String]](_ =>
auditIpClientColumn.fetcher
.callStack(HomeMixerColumn.FetchCallstack)
.fetch((), ()).map(_.v))
Arrow.join(
populateUserRoles,
populateIpAddress
)
}
Arrow.zipWithArg(populateUserRolesAndIp).map {
case ((key, view), (roles, ipAddress)) =>
val deviceContextOpt = Some(
hm.DeviceContext(
isPolling = CallContext.isPolling,
requestContext = view.requestContext,
latestControlAvailable = view.latestControlAvailable,
autoplayEnabled = view.autoplayEnabled
))
val seenTweetIds = view.seenTweetIds.filter(_.nonEmpty)
val (product, productContext) = key match {
case gql.TimelineKey.HomeTimeline(_) | gql.TimelineKey.HomeTimelineV2(_) =>
(
hm.Product.ForYou,
hm.ProductContext.ForYou(
hm.ForYou(
deviceContextOpt,
seenTweetIds,
view.dspClientContext,
view.pushToHomeTweetId
)
))
case gql.TimelineKey.HomeLatestTimeline(_) | gql.TimelineKey.HomeLatestTimelineV2(_) =>
(
hm.Product.Following,
hm.ProductContext.Following(
hm.Following(deviceContextOpt, seenTweetIds, view.dspClientContext)))
case gql.TimelineKey.CreatorSubscriptionsTimeline(_) =>
(
hm.Product.Subscribed,
hm.ProductContext.Subscribed(hm.Subscribed(deviceContextOpt, seenTweetIds)))
case _ => throw new UnsupportedOperationException(s"Unknown product: $key")
}
val clientContext = pm.ClientContext(
userId = CallContext.twitterUserId,
guestId = CallContext.guestId,
guestIdAds = CallContext.guestIdAds,
guestIdMarketing = CallContext.guestIdMarketing,
appId = CallContext.clientApplicationId,
ipAddress = ipAddress,
userAgent = CallContext.userAgent,
countryCode = CallContext.requestCountryCode,
languageCode = CallContext.requestLanguageCode,
isTwoffice = CallContext.isInternalOrTwoffice,
userRoles = roles,
deviceId = CallContext.deviceId,
mobileDeviceId = CallContext.mobileDeviceId,
mobileDeviceAdId = CallContext.adId,
limitAdTracking = CallContext.limitAdTracking
)
hm.HomeMixerRequest(
clientContext = clientContext,
product = product,
productContext = Some(productContext),
maxResults = Try(view.count.get.toInt).toOption.orElse(HomeMixerColumn.MaxCount),
cursor = view.cursor.filter(_.nonEmpty)
)
}
}
override val fetch: Arrow[(Key, View), Result[Value]] = {
val transformThriftIntoPipelineRequest: Arrow[
(Key, View),
ProductPipelineRequest[HomeMixerRequest]
] = {
Arrow
.identity[(Key, View)]
.andThen {
createHomeMixerRequestArrow(compositeOnUserClientColumn, auditIpClientColumn)
}
.map {
case thriftRequest =>
val request = homeMixerRequestUnmarshaller(thriftRequest)
val params = paramsBuilder.build(
clientContext = request.clientContext,
product = request.product,
featureOverrides =
request.debugParams.flatMap(_.featureOverrides).getOrElse(Map.empty),
)
ProductPipelineRequest(request, params)
}
}
val underlyingProduct: Arrow[
ProductPipelineRequest[HomeMixerRequest],
ProductPipelineResult[tr.TimelineResponse]
] = Arrow
.identity[ProductPipelineRequest[HomeMixerRequest]]
.map { pipelineRequest =>
val pipelineArrow = productPipelineRegistry
.getProductPipeline[HomeMixerRequest, tr.TimelineResponse](
pipelineRequest.request.product)
.arrow
(pipelineArrow, pipelineRequest)
}.applyArrow
transformThriftIntoPipelineRequest.andThen(underlyingProduct).map {
_.result match {
case Some(result) => found(result.timeline)
case _ => missing
}
}
}
}
object HomeMixerColumn {
val Path = "home-mixer/homeMixer.Timeline"
private val FetchCallstack = s"$Path:fetch"
private val MaxCount: Option[Int] = Some(100)
}

View File

@ -1,19 +0,0 @@
scala_library(
sources = ["*.scala"],
compiler_option_sets = ["fatal_warnings"],
strict_deps = True,
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/javax/inject:javax.inject",
"finagle/finagle-memcached/src/main/scala",
"finatra/inject/inject-core/src/main/scala",
"home-mixer/server/src/main/scala/com/twitter/home_mixer/param",
"product-mixer/core/src/main/scala/com/twitter/product_mixer/core/functional_component/candidate_source",
"product-mixer/core/src/main/scala/com/twitter/product_mixer/core/pipeline",
"src/thrift/com/twitter/search:earlybird-scala",
"stitch/stitch-timelineservice/src/main/scala",
],
exports = [
"product-mixer/core/src/main/scala/com/twitter/product_mixer/core/functional_component/candidate_source",
],
)

View File

@ -1,44 +0,0 @@
package com.twitter.home_mixer.functional_component.candidate_source
import com.twitter.product_mixer.core.feature.FeatureWithDefaultOnFailure
import com.twitter.product_mixer.core.feature.featuremap.FeatureMapBuilder
import com.twitter.product_mixer.core.functional_component.candidate_source.CandidateSourceWithExtractedFeatures
import com.twitter.product_mixer.core.functional_component.candidate_source.CandidatesWithSourceFeatures
import com.twitter.product_mixer.core.model.common.identifier.CandidateSourceIdentifier
import com.twitter.search.earlybird.{thriftscala => t}
import com.twitter.stitch.Stitch
import javax.inject.Inject
import javax.inject.Singleton
case object EarlybirdResponseTruncatedFeature
extends FeatureWithDefaultOnFailure[t.EarlybirdRequest, Boolean] {
override val defaultValue: Boolean = false
}
case object EarlybirdBottomTweetFeature
extends FeatureWithDefaultOnFailure[t.EarlybirdRequest, Option[Long]] {
override val defaultValue: Option[Long] = None
}
@Singleton
case class EarlybirdCandidateSource @Inject() (
earlybird: t.EarlybirdService.MethodPerEndpoint)
extends CandidateSourceWithExtractedFeatures[t.EarlybirdRequest, t.ThriftSearchResult] {
override val identifier = CandidateSourceIdentifier("Earlybird")
override def apply(
request: t.EarlybirdRequest
): Stitch[CandidatesWithSourceFeatures[t.ThriftSearchResult]] = {
Stitch.callFuture(earlybird.search(request)).map { response =>
val candidates = response.searchResults.map(_.results).getOrElse(Seq.empty)
val features = FeatureMapBuilder()
.add(EarlybirdResponseTruncatedFeature, candidates.size == request.searchQuery.numResults)
.add(EarlybirdBottomTweetFeature, candidates.lastOption.map(_.id))
.build()
CandidatesWithSourceFeatures(candidates, features)
}
}
}

View File

@ -1,30 +0,0 @@
package com.twitter.home_mixer.functional_component.candidate_source
import com.google.inject.name.Named
import com.twitter.finagle.memcached.{Client => MemcachedClient}
import com.twitter.home_mixer.param.HomeMixerInjectionNames.StaleTweetsCache
import com.twitter.product_mixer.core.functional_component.candidate_source.CandidateSource
import com.twitter.product_mixer.core.model.common.identifier.CandidateSourceIdentifier
import com.twitter.stitch.Stitch
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
class StaleTweetsCacheCandidateSource @Inject() (
@Named(StaleTweetsCache) staleTweetsCache: MemcachedClient)
extends CandidateSource[Seq[Long], Long] {
override val identifier: CandidateSourceIdentifier = CandidateSourceIdentifier("StaleTweetsCache")
private val StaleTweetsCacheKeyPrefix = "v1_"
override def apply(request: Seq[Long]): Stitch[Seq[Long]] = {
val keys = request.map(StaleTweetsCacheKeyPrefix + _)
Stitch.callFuture(staleTweetsCache.get(keys).map { tweets =>
tweets.map {
case (k, _) => k.replaceFirst(StaleTweetsCacheKeyPrefix, "").toLong
}.toSeq
})
}
}

View File

@ -1,28 +0,0 @@
scala_library(
sources = ["*.scala"],
compiler_option_sets = ["fatal_warnings"],
strict_deps = True,
tags = ["bazel-compatible"],
dependencies = [
"finagle/finagle-core/src/main",
"home-mixer/server/src/main/scala/com/twitter/home_mixer/functional_component/decorator/builder",
"home-mixer/server/src/main/scala/com/twitter/home_mixer/functional_component/decorator/urt/builder",
"home-mixer/server/src/main/scala/com/twitter/home_mixer/model",
"home-mixer/server/src/main/scala/com/twitter/home_mixer/model/request",
"home-mixer/server/src/main/scala/com/twitter/home_mixer/param",
"home-mixer/server/src/main/scala/com/twitter/home_mixer/util",
"product-mixer/component-library/src/main/scala/com/twitter/product_mixer/component_library/decorator/urt",
"product-mixer/component-library/src/main/scala/com/twitter/product_mixer/component_library/model/candidate",
"product-mixer/core/src/main/java/com/twitter/product_mixer/core/product/guice/scope",
"product-mixer/core/src/main/scala/com/twitter/product_mixer/core/functional_component/decorator/urt/builder",
"product-mixer/core/src/main/scala/com/twitter/product_mixer/core/functional_component/marshaller/response/urt",
"product-mixer/core/src/main/scala/com/twitter/product_mixer/core/pipeline",
"src/scala/com/twitter/suggests/controller_data",
"src/thrift/com/twitter/suggests/controller_data:controller_data-scala",
"src/thrift/com/twitter/timelinescorer/common/scoredtweetcandidate:thrift-scala",
"src/thrift/com/twitter/timelineservice/server/suggests/logging:thrift-scala",
"stringcenter/client",
"stringcenter/client/src/main/java",
"timelines/src/main/scala/com/twitter/timelines/injection/scribe",
],
)

View File

@ -1,51 +0,0 @@
package com.twitter.home_mixer.functional_component.decorator
import com.twitter.home_mixer.functional_component.decorator.builder.HomeConversationModuleMetadataBuilder
import com.twitter.home_mixer.functional_component.decorator.builder.HomeTimelinesScoreInfoBuilder
import com.twitter.home_mixer.functional_component.decorator.urt.builder.HomeFeedbackActionInfoBuilder
import com.twitter.home_mixer.model.HomeFeatures.ConversationModuleFocalTweetIdFeature
import com.twitter.product_mixer.component_library.decorator.urt.UrtItemCandidateDecorator
import com.twitter.product_mixer.component_library.decorator.urt.UrtMultipleModulesDecorator
import com.twitter.product_mixer.component_library.decorator.urt.builder.item.tweet.TweetCandidateUrtItemBuilder
import com.twitter.product_mixer.component_library.decorator.urt.builder.metadata.ClientEventInfoBuilder
import com.twitter.product_mixer.component_library.decorator.urt.builder.timeline_module.StaticModuleDisplayTypeBuilder
import com.twitter.product_mixer.component_library.decorator.urt.builder.timeline_module.TimelineModuleBuilder
import com.twitter.product_mixer.component_library.model.candidate.TweetCandidate
import com.twitter.product_mixer.core.model.marshalling.response.urt.EntryNamespace
import com.twitter.product_mixer.core.model.marshalling.response.urt.timeline_module.VerticalConversation
import com.twitter.product_mixer.core.pipeline.PipelineQuery
import com.twitter.timelines.injection.scribe.InjectionScribeUtil
import com.twitter.timelineservice.suggests.{thriftscala => st}
object HomeConversationServiceCandidateDecorator {
private val ConversationModuleNamespace = EntryNamespace("home-conversation")
def apply(
homeFeedbackActionInfoBuilder: HomeFeedbackActionInfoBuilder
): Some[UrtMultipleModulesDecorator[PipelineQuery, TweetCandidate, Long]] = {
val suggestType = st.SuggestType.RankedOrganicTweet
val component = InjectionScribeUtil.scribeComponent(suggestType).get
val clientEventInfoBuilder = ClientEventInfoBuilder(component)
val tweetItemBuilder = TweetCandidateUrtItemBuilder(
clientEventInfoBuilder = clientEventInfoBuilder,
timelinesScoreInfoBuilder = Some(HomeTimelinesScoreInfoBuilder),
feedbackActionInfoBuilder = Some(homeFeedbackActionInfoBuilder)
)
val moduleBuilder = TimelineModuleBuilder(
entryNamespace = ConversationModuleNamespace,
clientEventInfoBuilder = clientEventInfoBuilder,
displayTypeBuilder = StaticModuleDisplayTypeBuilder(VerticalConversation),
metadataBuilder = Some(HomeConversationModuleMetadataBuilder())
)
Some(
UrtMultipleModulesDecorator(
urtItemCandidateDecorator = UrtItemCandidateDecorator(tweetItemBuilder),
moduleBuilder = moduleBuilder,
groupByKey = (_, _, candidateFeatures) =>
candidateFeatures.getOrElse(ConversationModuleFocalTweetIdFeature, None)
))
}
}

View File

@ -1,18 +0,0 @@
package com.twitter.home_mixer.functional_component.decorator
import com.twitter.home_mixer.model.HomeFeatures._
import com.twitter.product_mixer.core.feature.featuremap.FeatureMap
object HomeQueryTypePredicates {
private[this] val QueryPredicates: Seq[(String, FeatureMap => Boolean)] = Seq(
("request", _ => true),
("get_initial", _.getOrElse(GetInitialFeature, false)),
("get_newer", _.getOrElse(GetNewerFeature, false)),
("get_older", _.getOrElse(GetOlderFeature, false)),
("pull_to_refresh", _.getOrElse(PullToRefreshFeature, false)),
("request_context_launch", _.getOrElse(IsLaunchRequestFeature, false)),
("request_context_foreground", _.getOrElse(IsForegroundRequestFeature, false))
)
val PredicateMap = QueryPredicates.toMap
}

Some files were not shown because too many files have changed in this diff Show More