Delete recos-injector directory

This commit is contained in:
dogemanttv 2024-01-10 17:08:01 -06:00 committed by GitHub
parent 0ae3f56f0f
commit 70f6c18f0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
52 changed files with 0 additions and 3449 deletions

View File

@ -1 +0,0 @@
# This prevents SQ query from grabbing //:all since it traverses up once to find a BUILD (DPB-14048)

View File

@ -1,10 +0,0 @@
; See http://go/CONFIG.ini
[jira]
project: SD
[docbird]
project_name = recos-injector
[kite]
project: recos-injector

View File

@ -1,40 +0,0 @@
# Recos-Injector
Recos-Injector is a streaming event processor used to build input streams for GraphJet-based services. It is a general-purpose tool that consumes arbitrary incoming event streams (e.g., Fav, RT, Follow, client_events, etc.), applies filtering, and combines and publishes cleaned up events to corresponding GraphJet services. Each GraphJet-based service subscribes to a dedicated Kafka topic, and Recos-Injector enables GraphJet-based services to consume any event they want.
## How to run Recos-Injector server tests
You can run tests by using the following command from your project's root directory:
$ bazel build recos-injector/...
$ bazel test recos-injector/...
## How to run recos-injector-server in development on a local machine
The simplest way to stand up a service is to run it locally. To run
recos-injector-server in development mode, compile the project and then
execute it with `bazel run`:
$ bazel build recos-injector/server:bin
$ bazel run recos-injector/server:bin
A tunnel can be set up in order for downstream queries to work properly.
Upon successful server startup, try to `curl` its admin endpoint in another
terminal:
$ curl -s localhost:9990/admin/ping
pong
Run `curl -s localhost:9990/admin` to see a list of all available admin endpoints.
## Querying Recos-Injector server from a Scala console
Recos-Injector does not have a Thrift endpoint. Instead, it reads Event Bus and Kafka queues and writes to the Recos-Injector Kafka.
## Generating a package for deployment
To package your service into a zip file for deployment, run:
$ bazel bundle recos-injector/server:bin --bundle-jvm-archive=zip
If the command is successful, a file named `dist/recos-injector-server.zip` will be created.

View File

@ -1,43 +0,0 @@
target(
name = "server",
tags = ["bazel-compatible"],
dependencies = [
"recos-injector/server/src/main/scala/com/twitter/recosinjector",
],
)
test_suite(
name = "tests",
tags = ["bazel-compatible"],
dependencies = [
"recos-injector/server/src/test/scala/com/twitter/recosinjector",
],
)
jvm_binary(
name = "bin",
basename = "recos-injector-server",
main = "com.twitter.recosinjector.Main",
platform = "java11",
runtime_platform = "java11",
tags = [
"bazel-compatible:migrated",
],
dependencies = [
":server",
"3rdparty/jvm/org/slf4j:slf4j-jdk14",
],
)
jvm_app(
name = "bundle",
basename = "recos-injector",
binary = ":bin",
bundles = [bundle(
fileset = ["config/*"],
owning_target = "recos-injector/server/config:files",
)],
tags = [
"bazel-compatible:migrated",
],
)

View File

@ -1,20 +0,0 @@
resources(
sources = [
"!*.pyc",
"!BUILD*",
"*",
],
tags = ["bazel-compatible"],
)
# Created for Bazel compatibility.
# In Bazel, loose files must be part of a target to be included into a bundle.
# See also http://go/bazel-compatibility/bundle_does_not_match_any_files
files(
name = "files",
sources = [
"!BUILD",
"**/*",
],
tags = ["bazel-compatible"],
)

View File

@ -1,7 +0,0 @@
[Configs]
DCS = all
ROLE = recos-injector
JOB = recos-injector
ENV = prod
PACKAGE = recos-injector-release
PATH = recos-injector

View File

@ -1,11 +0,0 @@
tweet_event_transformer_user_tweet_entity_edges:
comment: "Enables the generation of UserTweetEntity edges in tweet event transformer"
default_availability: 0
enable_emit_tweet_edge_from_reply:
comment: "Decides when processing a Reply edge, whether to generate a Tweet edge for it as well"
default_availability: 0
enable_unfavorite_edge:
comment: "Decides when processing a UnfavoriteEvent from Timeline events, whether to process unfav edges"
default_availability: 0

View File

@ -1,40 +0,0 @@
scala_library(
platform = "java11",
strict_deps = False,
tags = [
"bazel-compatible",
],
dependencies = [
"3rdparty/jvm/io/netty:netty4-tcnative-boringssl-static",
"3rdparty/jvm/org/apache/thrift:libthrift",
"eventbus/client",
"finagle-internal/mtls/src/main/scala/com/twitter/finagle/mtls/authentication",
"finagle-internal/mtls/src/main/scala/com/twitter/finagle/mtls/client",
"finagle-internal/mtls/src/main/scala/com/twitter/finagle/mtls/server",
"finagle/finagle-core/src/main",
"finagle/finagle-http/src/main/scala",
"finagle/finagle-stats",
"finagle/finagle-thriftmux",
"recos-injector/server/config",
"recos-injector/server/src/main/scala/com/twitter/recosinjector/clients",
"recos-injector/server/src/main/scala/com/twitter/recosinjector/config",
"recos-injector/server/src/main/scala/com/twitter/recosinjector/decider",
"recos-injector/server/src/main/scala/com/twitter/recosinjector/edges",
"recos-injector/server/src/main/scala/com/twitter/recosinjector/event_processors",
"recos-injector/server/src/main/scala/com/twitter/recosinjector/publishers",
"recos-injector/server/src/main/scala/com/twitter/recosinjector/uua_processors",
"src/thrift/com/twitter/clientapp/gen:clientapp-scala",
"src/thrift/com/twitter/gizmoduck:user-thrift-scala",
"src/thrift/com/twitter/socialgraph:thrift-scala",
"src/thrift/com/twitter/timelineservice/server/internal:thrift-scala",
"src/thrift/com/twitter/tweetypie:events-scala",
"src/thrift/com/twitter/tweetypie:tweet-scala",
"thrift-web-forms",
"twitter-server-internal",
"twitter-server/server/src/main/scala",
"twitter-server/slf4j-jdk14/src/main/scala/com/twitter/server/logging",
"util/util-app",
"util/util-logging/src/main/scala",
"util/util-stats/src/main/scala",
],
)

View File

@ -1,213 +0,0 @@
package com.twitter.recosinjector
import com.twitter.app.Flag
import com.twitter.finagle.http.HttpMuxer
import com.twitter.finagle.mtls.authentication.ServiceIdentifier
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.frigate.common.util.ElfOwlFilter
import com.twitter.recosinjector.clients.Gizmoduck
import com.twitter.recosinjector.clients.RecosHoseEntitiesCache
import com.twitter.recosinjector.clients.SocialGraph
import com.twitter.recosinjector.clients.Tweetypie
import com.twitter.recosinjector.clients.UrlResolver
import com.twitter.recosinjector.config._
import com.twitter.recosinjector.edges.SocialWriteEventToUserUserGraphBuilder
import com.twitter.recosinjector.edges.TimelineEventToUserTweetEntityGraphBuilder
import com.twitter.recosinjector.edges.TweetEventToUserTweetEntityGraphBuilder
import com.twitter.recosinjector.edges.TweetEventToUserUserGraphBuilder
import com.twitter.recosinjector.edges.UnifiedUserActionToUserVideoGraphBuilder
import com.twitter.recosinjector.edges.UnifiedUserActionToUserAdGraphBuilder
import com.twitter.recosinjector.edges.UnifiedUserActionToUserTweetGraphPlusBuilder
import com.twitter.recosinjector.edges.UserTweetEntityEdgeBuilder
import com.twitter.recosinjector.event_processors.SocialWriteEventProcessor
import com.twitter.recosinjector.event_processors.TimelineEventProcessor
import com.twitter.recosinjector.event_processors.TweetEventProcessor
import com.twitter.recosinjector.publishers.KafkaEventPublisher
import com.twitter.recosinjector.uua_processors.UnifiedUserActionProcessor
import com.twitter.recosinjector.uua_processors.UnifiedUserActionsConsumer
import com.twitter.server.logging.{Logging => JDK14Logging}
import com.twitter.server.Deciderable
import com.twitter.server.TwitterServer
import com.twitter.socialgraph.thriftscala.WriteEvent
import com.twitter.timelineservice.thriftscala.{Event => TimelineEvent}
import com.twitter.tweetypie.thriftscala.TweetEvent
import com.twitter.util.Await
import com.twitter.util.Duration
import java.util.concurrent.TimeUnit
object Main extends TwitterServer with JDK14Logging with Deciderable { self =>
implicit val stats: StatsReceiver = statsReceiver
private val dataCenter: Flag[String] = flag("service.cluster", "atla", "Data Center")
private val serviceRole: Flag[String] = flag("service.role", "Service Role")
private val serviceEnv: Flag[String] = flag("service.env", "Service Env")
private val serviceName: Flag[String] = flag("service.name", "Service Name")
private val shardId = flag("shardId", 0, "Shard ID")
private val numShards = flag("numShards", 1, "Number of shards for this service")
private val truststoreLocation =
flag[String]("truststore_location", "", "Truststore file location")
def main(): Unit = {
val serviceIdentifier = ServiceIdentifier(
role = serviceRole(),
service = serviceName(),
environment = serviceEnv(),
zone = dataCenter()
)
println("ServiceIdentifier = " + serviceIdentifier.toString)
log.info("ServiceIdentifier = " + serviceIdentifier.toString)
val shard = shardId()
val numOfShards = numShards()
val environment = serviceEnv()
implicit val config: DeployConfig = {
environment match {
case "prod" => ProdConfig(serviceIdentifier)(stats)
case "staging" | "devel" => StagingConfig(serviceIdentifier)
case env => throw new Exception(s"Unknown environment $env")
}
}
// Initialize the config and wait for initialization to finish
Await.ready(config.init())
log.info(
"Starting Recos Injector: environment %s, clientId %s",
environment,
config.recosInjectorThriftClientId
)
log.info("Starting shard Id: %d of %d shards...".format(shard, numOfShards))
// Client wrappers
val cache = new RecosHoseEntitiesCache(config.recosInjectorCoreSvcsCacheClient)
val gizmoduck = new Gizmoduck(config.userStore)
val socialGraph = new SocialGraph(config.socialGraphIdStore)
val tweetypie = new Tweetypie(config.tweetyPieStore)
val urlResolver = new UrlResolver(config.urlInfoStore)
// Edge builders
val userTweetEntityEdgeBuilder = new UserTweetEntityEdgeBuilder(cache, urlResolver)
// Publishers
val kafkaEventPublisher = KafkaEventPublisher(
"/s/kafka/recommendations:kafka-tls",
config.outputKafkaTopicPrefix,
config.recosInjectorThriftClientId,
truststoreLocation())
// Message Builders
val socialWriteToUserUserMessageBuilder =
new SocialWriteEventToUserUserGraphBuilder()(
statsReceiver.scope("SocialWriteEventToUserUserGraphBuilder")
)
val timelineToUserTweetEntityMessageBuilder = new TimelineEventToUserTweetEntityGraphBuilder(
userTweetEntityEdgeBuilder = userTweetEntityEdgeBuilder
)(statsReceiver.scope("TimelineEventToUserTweetEntityGraphBuilder"))
val tweetEventToUserTweetEntityGraphBuilder = new TweetEventToUserTweetEntityGraphBuilder(
userTweetEntityEdgeBuilder = userTweetEntityEdgeBuilder,
tweetCreationStore = config.tweetCreationStore,
decider = config.recosInjectorDecider
)(statsReceiver.scope("TweetEventToUserTweetEntityGraphBuilder"))
val socialWriteEventProcessor = new SocialWriteEventProcessor(
eventBusStreamName = s"recos_injector_social_write_event_$environment",
thriftStruct = WriteEvent,
serviceIdentifier = serviceIdentifier,
kafkaEventPublisher = kafkaEventPublisher,
userUserGraphTopic = KafkaEventPublisher.UserUserTopic,
userUserGraphMessageBuilder = socialWriteToUserUserMessageBuilder
)(statsReceiver.scope("SocialWriteEventProcessor"))
val tweetToUserUserMessageBuilder = new TweetEventToUserUserGraphBuilder()(
statsReceiver.scope("TweetEventToUserUserGraphBuilder")
)
val unifiedUserActionToUserVideoGraphBuilder = new UnifiedUserActionToUserVideoGraphBuilder(
userTweetEntityEdgeBuilder = userTweetEntityEdgeBuilder
)(statsReceiver.scope("UnifiedUserActionToUserVideoGraphBuilder"))
val unifiedUserActionToUserAdGraphBuilder = new UnifiedUserActionToUserAdGraphBuilder(
userTweetEntityEdgeBuilder = userTweetEntityEdgeBuilder
)(statsReceiver.scope("UnifiedUserActionToUserAdGraphBuilder"))
val unifiedUserActionToUserTweetGraphPlusBuilder =
new UnifiedUserActionToUserTweetGraphPlusBuilder(
userTweetEntityEdgeBuilder = userTweetEntityEdgeBuilder
)(statsReceiver.scope("UnifiedUserActionToUserTweetGraphPlusBuilder"))
// Processors
val tweetEventProcessor = new TweetEventProcessor(
eventBusStreamName = s"recos_injector_tweet_events_$environment",
thriftStruct = TweetEvent,
serviceIdentifier = serviceIdentifier,
userUserGraphMessageBuilder = tweetToUserUserMessageBuilder,
userUserGraphTopic = KafkaEventPublisher.UserUserTopic,
userTweetEntityGraphMessageBuilder = tweetEventToUserTweetEntityGraphBuilder,
userTweetEntityGraphTopic = KafkaEventPublisher.UserTweetEntityTopic,
kafkaEventPublisher = kafkaEventPublisher,
socialGraph = socialGraph,
tweetypie = tweetypie,
gizmoduck = gizmoduck
)(statsReceiver.scope("TweetEventProcessor"))
val timelineEventProcessor = new TimelineEventProcessor(
eventBusStreamName = s"recos_injector_timeline_events_prototype_$environment",
thriftStruct = TimelineEvent,
serviceIdentifier = serviceIdentifier,
kafkaEventPublisher = kafkaEventPublisher,
userTweetEntityGraphTopic = KafkaEventPublisher.UserTweetEntityTopic,
userTweetEntityGraphMessageBuilder = timelineToUserTweetEntityMessageBuilder,
decider = config.recosInjectorDecider,
gizmoduck = gizmoduck,
tweetypie = tweetypie
)(statsReceiver.scope("TimelineEventProcessor"))
val eventBusProcessors = Seq(
timelineEventProcessor,
socialWriteEventProcessor,
tweetEventProcessor
)
val uuaProcessor = new UnifiedUserActionProcessor(
gizmoduck = gizmoduck,
tweetypie = tweetypie,
kafkaEventPublisher = kafkaEventPublisher,
userVideoGraphTopic = KafkaEventPublisher.UserVideoTopic,
userVideoGraphBuilder = unifiedUserActionToUserVideoGraphBuilder,
userAdGraphTopic = KafkaEventPublisher.UserAdTopic,
userAdGraphBuilder = unifiedUserActionToUserAdGraphBuilder,
userTweetGraphPlusTopic = KafkaEventPublisher.UserTweetPlusTopic,
userTweetGraphPlusBuilder = unifiedUserActionToUserTweetGraphPlusBuilder)(
statsReceiver.scope("UnifiedUserActionProcessor"))
val uuaConsumer = new UnifiedUserActionsConsumer(uuaProcessor, truststoreLocation())
// Start-up init and graceful shutdown setup
// wait a bit for services to be ready
Thread.sleep(5000L)
log.info("Starting the event processors")
eventBusProcessors.foreach(_.start())
log.info("Starting the uua processors")
uuaConsumer.atLeastOnceProcessor.start()
this.addAdminRoute(ElfOwlFilter.getPostbackRoute())
onExit {
log.info("Shutting down the event processors")
eventBusProcessors.foreach(_.stop())
log.info("Shutting down the uua processors")
uuaConsumer.atLeastOnceProcessor.close()
log.info("done exit")
}
// Wait on the thriftServer so that shutdownTimeout is respected.
Await.result(adminHttpServer)
}
}

View File

@ -1,20 +0,0 @@
scala_library(
platform = "java11",
strict_deps = False,
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/com/twitter/storehaus:core",
"finagle/finagle-memcached/src/main/scala",
"finagle/finagle-stats",
"frigate/frigate-common/src/main/scala/com/twitter/frigate/common/base",
"frigate/frigate-common/src/main/scala/com/twitter/frigate/common/util",
"servo/repo/src/main/scala",
"src/thrift/com/twitter/gizmoduck:thrift-scala",
"src/thrift/com/twitter/gizmoduck:user-thrift-scala",
"src/thrift/com/twitter/recos:recos-internal-scala",
"src/thrift/com/twitter/socialgraph:thrift-scala",
"src/thrift/com/twitter/tweetypie:service-scala",
"stitch/stitch-tweetypie/src/main/scala",
"util/util-logging/src/main/scala",
],
)

View File

@ -1,26 +0,0 @@
package com.twitter.recosinjector.clients
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.gizmoduck.thriftscala.User
import com.twitter.logging.Logger
import com.twitter.storehaus.ReadableStore
import com.twitter.util.Future
class Gizmoduck(
userStore: ReadableStore[Long, User]
)(
implicit statsReceiver: StatsReceiver) {
private val log = Logger()
private val stats = statsReceiver.scope(this.getClass.getSimpleName)
def getUser(userId: Long): Future[Option[User]] = {
userStore
.get(userId)
.rescue {
case e =>
stats.scope("getUserFailure").counter(e.getClass.getSimpleName).incr()
log.error(s"Failed with message ${e.toString}")
Future.None
}
}
}

View File

@ -1,137 +0,0 @@
package com.twitter.recosinjector.clients
import com.twitter.conversions.DurationOps._
import com.twitter.finagle.memcached.Client
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.io.Buf
import com.twitter.recos.internal.thriftscala.{RecosHoseEntities, RecosHoseEntity}
import com.twitter.servo.cache.ThriftSerializer
import com.twitter.util.{Duration, Future, Time}
import org.apache.thrift.protocol.TBinaryProtocol
case class CacheEntityEntry(
cachePrefix: String,
hashedEntityId: Int,
entity: String) {
val fullKey: String = cachePrefix + hashedEntityId
}
object RecosHoseEntitiesCache {
val EntityTTL: Duration = 30.hours
val EntitiesSerializer =
new ThriftSerializer[RecosHoseEntities](RecosHoseEntities, new TBinaryProtocol.Factory())
val HashtagPrefix: String = "h"
val UrlPrefix: String = "u"
}
/**
* A cache layer to store entities.
* Graph services like user_tweet_entity_graph and user_url_graph store user interactions with
* entities in a tweet, such as HashTags and URLs. These entities are string values that can be
* potentially very big. Therefore, we instead store a hashed id in the graph edge, and keep a
* (hashedId -> entity) mapping in this cache. The actual entity values can be recovered
* by the graph service at serving time using this cache.
*/
class RecosHoseEntitiesCache(client: Client) {
import RecosHoseEntitiesCache._
private def isEntityWithinTTL(entity: RecosHoseEntity, ttlInMillis: Long): Boolean = {
entity.timestamp.exists(timestamp => Time.now.inMilliseconds - timestamp <= ttlInMillis)
}
/**
* Add a new RecosHoseEntity into RecosHoseEntities
*/
private def updateRecosHoseEntities(
existingEntitiesOpt: Option[RecosHoseEntities],
newEntityString: String,
stats: StatsReceiver
): RecosHoseEntities = {
val existingEntities = existingEntitiesOpt.map(_.entities).getOrElse(Nil)
// Discard expired and duplicate existing entities
val validExistingEntities = existingEntities
.filter(entity => isEntityWithinTTL(entity, EntityTTL.inMillis))
.filter(_.entity != newEntityString)
val newRecosHoseEntity = RecosHoseEntity(newEntityString, Some(Time.now.inMilliseconds))
RecosHoseEntities(validExistingEntities :+ newRecosHoseEntity)
}
private def getRecosHoseEntitiesCache(
cacheEntries: Seq[CacheEntityEntry],
stats: StatsReceiver
): Future[Map[String, Option[RecosHoseEntities]]] = {
client
.get(cacheEntries.map(_.fullKey))
.map(_.map {
case (cacheKey, buf) =>
val recosHoseEntitiesTry = EntitiesSerializer.from(Buf.ByteArray.Owned.extract(buf))
if (recosHoseEntitiesTry.isThrow) {
stats.counter("cache_get_deserialization_failure").incr()
}
cacheKey -> recosHoseEntitiesTry.toOption
})
.onSuccess { _ => stats.counter("get_cache_success").incr() }
.onFailure { ex =>
stats.scope("get_cache_failure").counter(ex.getClass.getSimpleName).incr()
}
}
private def putRecosHoseEntitiesCache(
cacheKey: String,
recosHoseEntities: RecosHoseEntities,
stats: StatsReceiver
): Unit = {
val serialized = EntitiesSerializer.to(recosHoseEntities)
if (serialized.isThrow) {
stats.counter("cache_put_serialization_failure").incr()
}
serialized.toOption.map { bytes =>
client
.set(cacheKey, 0, EntityTTL.fromNow, Buf.ByteArray.Owned(bytes))
.onSuccess { _ => stats.counter("put_cache_success").incr() }
.onFailure { ex =>
stats.scope("put_cache_failure").counter(ex.getClass.getSimpleName).incr()
}
}
}
/**
* Store a list of new entities into the cache by their cacheKeys, and remove expired/invalid
* values in the existing cache entries at the same time
*/
def updateEntitiesCache(
newCacheEntries: Seq[CacheEntityEntry],
stats: StatsReceiver
): Future[Unit] = {
stats.counter("update_cache_request").incr()
getRecosHoseEntitiesCache(newCacheEntries, stats)
.map { existingCacheEntries =>
newCacheEntries.foreach { newCacheEntry =>
val fullKey = newCacheEntry.fullKey
val existingRecosHoseEntities = existingCacheEntries.get(fullKey).flatten
stats.stat("num_existing_entities").add(existingRecosHoseEntities.size)
if (existingRecosHoseEntities.isEmpty) {
stats.counter("existing_entities_empty").incr()
}
val updatedRecosHoseEntities = updateRecosHoseEntities(
existingRecosHoseEntities,
newCacheEntry.entity,
stats
)
stats.stat("num_updated_entities").add(updatedRecosHoseEntities.entities.size)
if (updatedRecosHoseEntities.entities.nonEmpty) {
putRecosHoseEntitiesCache(fullKey, updatedRecosHoseEntities, stats)
}
}
}
.onSuccess { _ => stats.counter("update_cache_success").incr() }
.onFailure { ex =>
stats.scope("update_cache_failure").counter(ex.getClass.getSimpleName).incr()
}
}
}

View File

@ -1,80 +0,0 @@
package com.twitter.recosinjector.clients
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.logging.Logger
import com.twitter.socialgraph.thriftscala._
import com.twitter.storehaus.ReadableStore
import com.twitter.util.Future
class SocialGraph(
socialGraphIdStore: ReadableStore[IdsRequest, IdsResult]
)(
implicit statsReceiver: StatsReceiver) {
import SocialGraph._
private val log = Logger()
private val followedByNotMutedByStats = statsReceiver.scope("followedByNotMutedBy")
private def fetchIdsFromSocialGraph(
userId: Long,
ids: Seq[Long],
relationshipTypes: Map[RelationshipType, Boolean],
lookupContext: Option[LookupContext] = IncludeInactiveUnionLookupContext,
stats: StatsReceiver
): Future[Seq[Long]] = {
if (ids.isEmpty) {
stats.counter("fetchIdsEmpty").incr()
Future.Nil
} else {
val relationships = relationshipTypes.map {
case (relationshipType, hasRelationship) =>
SrcRelationship(
source = userId,
relationshipType = relationshipType,
hasRelationship = hasRelationship,
targets = Some(ids)
)
}.toSeq
val idsRequest = IdsRequest(
relationships = relationships,
pageRequest = SelectAllPageRequest,
context = lookupContext
)
socialGraphIdStore
.get(idsRequest)
.map { _.map(_.ids).getOrElse(Nil) }
.rescue {
case e =>
stats.scope("fetchIdsFailure").counter(e.getClass.getSimpleName).incr()
log.error(s"Failed with message ${e.toString}")
Future.Nil
}
}
}
// which of the users in candidates follow userId and have not muted userId
def followedByNotMutedBy(userId: Long, candidates: Seq[Long]): Future[Seq[Long]] = {
fetchIdsFromSocialGraph(
userId,
candidates,
FollowedByNotMutedRelationships,
IncludeInactiveLookupContext,
followedByNotMutedByStats
)
}
}
object SocialGraph {
val SelectAllPageRequest = Some(PageRequest(selectAll = Some(true)))
val IncludeInactiveLookupContext = Some(LookupContext(includeInactive = true))
val IncludeInactiveUnionLookupContext = Some(
LookupContext(includeInactive = true, performUnion = Some(true))
)
val FollowedByNotMutedRelationships: Map[RelationshipType, Boolean] = Map(
RelationshipType.FollowedBy -> true,
RelationshipType.MutedBy -> false
)
}

View File

@ -1,30 +0,0 @@
package com.twitter.recosinjector.clients
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.stitch.tweetypie.TweetyPie.{TweetyPieException, TweetyPieResult}
import com.twitter.storehaus.ReadableStore
import com.twitter.tweetypie.thriftscala.Tweet
import com.twitter.util.Future
class Tweetypie(
tweetyPieStore: ReadableStore[Long, TweetyPieResult]
)(
implicit statsReceiver: StatsReceiver) {
private val stats = statsReceiver.scope(this.getClass.getSimpleName)
private val failureStats = stats.scope("getTweetFailure")
def getTweet(tweetId: Long): Future[Option[Tweet]] = {
tweetyPieStore
.get(tweetId)
.map { _.map(_.tweet) }
.rescue {
case e: TweetyPieException =>
// Usually results from trying to query a protected or unsafe tweet
failureStats.scope("TweetyPieException").counter(e.result.tweetState.toString).incr()
Future.None
case e =>
failureStats.counter(e.getClass.getSimpleName).incr()
Future.None
}
}
}

View File

@ -1,105 +0,0 @@
package com.twitter.recosinjector.clients
import com.twitter.conversions.DurationOps._
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.util.DefaultTimer
import com.twitter.frigate.common.util.{SnowflakeUtils, UrlInfo}
import com.twitter.storehaus.{FutureOps, ReadableStore}
import com.twitter.util.{Duration, Future, Timer}
class UrlResolver(
urlInfoStore: ReadableStore[String, UrlInfo]
)(
implicit statsReceiver: StatsReceiver) {
private val EmptyFutureMap = Future.value(Map.empty[String, String])
private val stats = statsReceiver.scope(this.getClass.getSimpleName)
private val twitterResolvedUrlCounter = stats.counter("twitterResolvedUrl")
private val resolvedUrlCounter = stats.counter("resolvedUrl")
private val noResolvedUrlCounter = stats.counter("noResolvedUrl")
private val numNoDelayCounter = stats.counter("urlResolver_no_delay")
private val numDelayCounter = stats.counter("urlResolver_delay")
implicit val timer: Timer = DefaultTimer
/**
* Get the resolved URL map of the input raw URLs
*
* @param rawUrls list of raw URLs to query
* @return map of raw URL to resolved URL
*/
def getResolvedUrls(rawUrls: Set[String]): Future[Map[String, String]] = {
FutureOps
.mapCollect(urlInfoStore.multiGet[String](rawUrls))
.map { resolvedUrlsMap =>
resolvedUrlsMap.flatMap {
case (
url,
Some(
UrlInfo(
Some(resolvedUrl),
Some(_),
Some(domain),
_,
_,
_,
_,
Some(_),
_,
_,
_,
_))) =>
if (domain == "Twitter") { // Filter out Twitter based URLs
twitterResolvedUrlCounter.incr()
None
} else {
resolvedUrlCounter.incr()
Some(url -> resolvedUrl)
}
case _ =>
noResolvedUrlCounter.incr()
None
}
}
}
/**
* Get resolved url maps given a list of urls, grouping urls that point to the same webpage
*/
def getResolvedUrls(urls: Seq[String], tweetId: Long): Future[Map[String, String]] = {
if (urls.isEmpty) {
EmptyFutureMap
} else {
Future
.sleep(getUrlResolverDelayDuration(tweetId))
.before(getResolvedUrls(urls.toSet))
}
}
/**
* Given a tweet, return the amount of delay needed before attempting to resolve the Urls
*/
private def getUrlResolverDelayDuration(
tweetId: Long
): Duration = {
val urlResolverDelaySinceCreation = 12.seconds
val urlResolverDelayDuration = 4.seconds
val noDelay = 0.seconds
// Check whether the tweet was created more than the specified delay duration before now.
// If the tweet ID is not based on Snowflake, this is false, and the delay is applied.
val isCreatedBeforeDelayThreshold = SnowflakeUtils
.tweetCreationTime(tweetId)
.map(_.untilNow)
.exists(_ > urlResolverDelaySinceCreation)
if (isCreatedBeforeDelayThreshold) {
numNoDelayCounter.incr()
noDelay
} else {
numDelayCounter.incr()
urlResolverDelayDuration
}
}
}

View File

@ -1,36 +0,0 @@
scala_library(
platform = "java11",
strict_deps = False,
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/com/twitter/bijection:scrooge",
"3rdparty/jvm/com/twitter/storehaus:core",
"abdecider",
"decider/src/main/scala",
"finagle/finagle-memcached/src/main/scala",
"finagle/finagle-stats",
"frigate/frigate-common/src/main/scala/com/twitter/frigate/common/store",
"frigate/frigate-common/src/main/scala/com/twitter/frigate/common/util",
"hermit/hermit-core:store",
"hermit/hermit-core/src/main/scala/com/twitter/hermit/store/common",
"hermit/hermit-core/src/main/scala/com/twitter/hermit/store/gizmoduck",
"hermit/hermit-core/src/main/scala/com/twitter/hermit/store/tweetypie",
"recos-injector/server/src/main/scala/com/twitter/recosinjector/decider",
"scribelib/validators/src/main/scala/com/twitter/scribelib/validators",
"src/scala/com/twitter/storehaus_internal/memcache",
"src/scala/com/twitter/storehaus_internal/memcache/config",
"src/scala/com/twitter/storehaus_internal/util",
"src/thrift/com/twitter/gizmoduck:thrift-java",
"src/thrift/com/twitter/gizmoduck:thrift-scala",
"src/thrift/com/twitter/gizmoduck:user-thrift-scala",
"src/thrift/com/twitter/socialgraph:thrift-scala",
"src/thrift/com/twitter/spam/rtf:safety-level-scala",
"src/thrift/com/twitter/tweetypie:service-scala",
"stitch/stitch-core",
"stitch/stitch-socialgraph",
"stitch/stitch-storehaus/src/main/scala",
"stitch/stitch-tweetypie/src/main/scala",
"util/util-hashing/src/main/scala",
"util/util-logging/src/main/scala",
],
)

View File

@ -1,23 +0,0 @@
package com.twitter.recosinjector.config
import com.twitter.finagle.memcached.Client
import com.twitter.finagle.mtls.authentication.ServiceIdentifier
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.storehaus_internal.memcache.MemcacheStore
import com.twitter.storehaus_internal.util.{ClientName, ZkEndPoint}
trait CacheConfig {
implicit def statsReceiver: StatsReceiver
def serviceIdentifier: ServiceIdentifier
def recosInjectorCoreSvcsCacheDest: String
val recosInjectorCoreSvcsCacheClient: Client = MemcacheStore.memcachedClient(
name = ClientName("memcache-recos-injector"),
dest = ZkEndPoint(recosInjectorCoreSvcsCacheDest),
statsReceiver = statsReceiver,
serviceIdentifier = serviceIdentifier
)
}

View File

@ -1,41 +0,0 @@
package com.twitter.recosinjector.config
import com.twitter.finagle.mtls.authentication.ServiceIdentifier
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.thrift.ClientId
import com.twitter.frigate.common.store.TweetCreationTimeMHStore
import com.twitter.frigate.common.util.UrlInfo
import com.twitter.gizmoduck.thriftscala.User
import com.twitter.recosinjector.decider.RecosInjectorDecider
import com.twitter.socialgraph.thriftscala.{IdsRequest, IdsResult}
import com.twitter.stitch.tweetypie.TweetyPie.TweetyPieResult
import com.twitter.storehaus.ReadableStore
import com.twitter.util.Future
trait Config { self =>
implicit def statsReceiver: StatsReceiver
// ReadableStores
def tweetyPieStore: ReadableStore[Long, TweetyPieResult]
def userStore: ReadableStore[Long, User]
def socialGraphIdStore: ReadableStore[IdsRequest, IdsResult]
def urlInfoStore: ReadableStore[String, UrlInfo]
// Manhattan stores
def tweetCreationStore: TweetCreationTimeMHStore
// Decider
def recosInjectorDecider: RecosInjectorDecider
// Constants
def recosInjectorThriftClientId: ClientId
def serviceIdentifier: ServiceIdentifier
def outputKafkaTopicPrefix: String
def init(): Future[Unit] = Future.Done
}

View File

@ -1,215 +0,0 @@
package com.twitter.recosinjector.config
import com.twitter.bijection.scrooge.BinaryScalaCodec
import com.twitter.conversions.DurationOps._
import com.twitter.finagle.client.ClientRegistry
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.frigate.common.store.TweetCreationTimeMHStore
import com.twitter.frigate.common.util.Finagle._
import com.twitter.frigate.common.util.{UrlInfo, UrlInfoInjection, UrlResolver}
import com.twitter.gizmoduck.thriftscala.{LookupContext, QueryFields, User, UserService}
import com.twitter.hermit.store.common.{ObservedCachedReadableStore, ObservedMemcachedReadableStore}
import com.twitter.hermit.store.gizmoduck.GizmoduckUserStore
import com.twitter.hermit.store.tweetypie.TweetyPieStore
import com.twitter.logging.Logger
import com.twitter.pink_floyd.thriftscala.{ClientIdentifier, Storer}
import com.twitter.socialgraph.thriftscala.{IdsRequest, SocialGraphService}
import com.twitter.spam.rtf.thriftscala.SafetyLevel
import com.twitter.stitch.socialgraph.SocialGraph
import com.twitter.stitch.storehaus.ReadableStoreOfStitch
import com.twitter.stitch.tweetypie.TweetyPie.TweetyPieResult
import com.twitter.storage.client.manhattan.kv.{
ManhattanKVClient,
ManhattanKVClientMtlsParams,
ManhattanKVEndpointBuilder
}
import com.twitter.storehaus.ReadableStore
import com.twitter.tweetypie.thriftscala.{GetTweetOptions, TweetService}
import com.twitter.util.Future
/*
* Any finagle clients should not be defined as lazy. If defined lazy,
* ClientRegistry.expAllRegisteredClientsResolved() call in init will not ensure that the clients
* are active before thrift endpoint is active. We want the clients to be active, because zookeeper
* resolution triggered by first request(s) might result in the request(s) failing.
*/
trait DeployConfig extends Config with CacheConfig {
implicit def statsReceiver: StatsReceiver
def log: Logger
// Clients
val gizmoduckClient = new UserService.FinagledClient(
readOnlyThriftService(
"gizmoduck",
"/s/gizmoduck/gizmoduck",
statsReceiver,
recosInjectorThriftClientId,
requestTimeout = 450.milliseconds,
mTLSServiceIdentifier = Some(serviceIdentifier)
)
)
val tweetyPieClient = new TweetService.FinagledClient(
readOnlyThriftService(
"tweetypie",
"/s/tweetypie/tweetypie",
statsReceiver,
recosInjectorThriftClientId,
requestTimeout = 450.milliseconds,
mTLSServiceIdentifier = Some(serviceIdentifier)
)
)
val sgsClient = new SocialGraphService.FinagledClient(
readOnlyThriftService(
"socialgraph",
"/s/socialgraph/socialgraph",
statsReceiver,
recosInjectorThriftClientId,
requestTimeout = 450.milliseconds,
mTLSServiceIdentifier = Some(serviceIdentifier)
)
)
val pinkStoreClient = new Storer.FinagledClient(
readOnlyThriftService(
"pink_store",
"/s/spiderduck/pink-store",
statsReceiver,
recosInjectorThriftClientId,
requestTimeout = 450.milliseconds,
mTLSServiceIdentifier = Some(serviceIdentifier)
)
)
// Stores
private val _gizmoduckStore = {
val queryFields: Set[QueryFields] = Set(
QueryFields.Discoverability,
QueryFields.Labels,
QueryFields.Safety
)
val context: LookupContext = LookupContext(
includeDeactivated = true,
safetyLevel = Some(SafetyLevel.Recommendations)
)
GizmoduckUserStore(
client = gizmoduckClient,
queryFields = queryFields,
context = context,
statsReceiver = statsReceiver
)
}
override val userStore: ReadableStore[Long, User] = {
// memcache based cache
ObservedMemcachedReadableStore.fromCacheClient(
backingStore = _gizmoduckStore,
cacheClient = recosInjectorCoreSvcsCacheClient,
ttl = 2.hours
)(
valueInjection = BinaryScalaCodec(User),
statsReceiver = statsReceiver.scope("UserStore"),
keyToString = { k: Long =>
s"usri/$k"
}
)
}
/**
* TweetyPie store, used to fetch tweet objects when unavailable, and also as a source of
* tweet SafetyLevel filtering.
* Note: we do NOT cache TweetyPie calls, as it makes tweet SafetyLevel filtering less accurate.
* TweetyPie QPS is < 20K/cluster.
* More info is here:
* https://cgit.twitter.biz/source/tree/src/thrift/com/twitter/spam/rtf/safety_level.thrift
*/
override val tweetyPieStore: ReadableStore[Long, TweetyPieResult] = {
val getTweetOptions = Some(
GetTweetOptions(
includeCards = true,
safetyLevel = Some(SafetyLevel.RecosWritePath)
)
)
TweetyPieStore(
tweetyPieClient,
getTweetOptions,
convertExceptionsToNotFound = false // Do not suppress TweetyPie errors. Leave it to caller
)
}
private val _urlInfoStore = {
//Initialize pink store client, for parsing url
UrlResolver(
pinkStoreClient,
statsReceiver.scope("urlFetcher"),
clientId = ClientIdentifier.Recoshose
)
}
override val urlInfoStore: ReadableStore[String, UrlInfo] = {
// memcache based cache
val memcachedStore = ObservedMemcachedReadableStore.fromCacheClient(
backingStore = _urlInfoStore,
cacheClient = recosInjectorCoreSvcsCacheClient,
ttl = 2.hours
)(
valueInjection = UrlInfoInjection,
statsReceiver = statsReceiver.scope("UrlInfoStore"),
keyToString = { k: String =>
s"uisri/$k"
}
)
ObservedCachedReadableStore.from(
memcachedStore,
ttl = 1.minutes,
maxKeys = 1e5.toInt,
windowSize = 10000L,
cacheName = "url_store_in_proc_cache"
)(statsReceiver.scope("url_store_in_proc_cache"))
}
override val socialGraphIdStore = ReadableStoreOfStitch { idsRequest: IdsRequest =>
SocialGraph(sgsClient).ids(idsRequest)
}
/**
* MH Store for updating the last time user created a tweet
*/
val tweetCreationStore: TweetCreationTimeMHStore = {
val client = ManhattanKVClient(
appId = "recos_tweet_creation_info",
dest = "/s/manhattan/omega.native-thrift",
mtlsParams = ManhattanKVClientMtlsParams(serviceIdentifier)
)
val endpoint = ManhattanKVEndpointBuilder(client)
.defaultMaxTimeout(700.milliseconds)
.statsReceiver(
statsReceiver
.scope(serviceIdentifier.zone)
.scope(serviceIdentifier.environment)
.scope("recos_injector_tweet_creation_info_store")
)
.build()
val dataset = if (serviceIdentifier.environment == "prod") {
"recos_injector_tweet_creation_info"
} else {
"recos_injector_tweet_creation_info_staging"
}
new TweetCreationTimeMHStore(
cluster = serviceIdentifier.zone,
endpoint = endpoint,
dataset = dataset,
writeTtl = Some(14.days),
statsReceiver.scope("recos_injector_tweet_creation_info_store")
)
}
// wait for all serversets to populate
override def init(): Future[Unit] = ClientRegistry.expAllRegisteredClientsResolved().unit
}

View File

@ -1,29 +0,0 @@
package com.twitter.recosinjector.config
import com.twitter.finagle.mtls.authentication.ServiceIdentifier
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.thrift.ClientId
import com.twitter.logging.Logger
import com.twitter.recosinjector.decider.RecosInjectorDecider
case class ProdConfig(
override val serviceIdentifier: ServiceIdentifier
)(implicit val statsReceiver: StatsReceiver) extends {
// Due to trait initialization logic in Scala, any abstract members declared in Config or
// DeployConfig should be declared in this block. Otherwise the abstract member might initialize
// to null if invoked before before object creation finishing.
val recosInjectorThriftClientId = ClientId("recos-injector.prod")
val outputKafkaTopicPrefix = "recos_injector"
val log = Logger("ProdConfig")
val recosInjectorCoreSvcsCacheDest = "/srv#/prod/local/cache/recos_metadata"
val recosInjectorDecider = RecosInjectorDecider(
isProd = true,
dataCenter = serviceIdentifier.zone
)
} with DeployConfig

View File

@ -1,33 +0,0 @@
package com.twitter.recosinjector.config
import com.twitter.finagle.mtls.authentication.ServiceIdentifier
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.thrift.ClientId
import com.twitter.logging.Logger
import com.twitter.recosinjector.decider.RecosInjectorDecider
case class StagingConfig(
override val serviceIdentifier: ServiceIdentifier
)(
implicit val statsReceiver: StatsReceiver)
extends {
// Due to trait initialization logic in Scala, any abstract members declared in Config or
// DeployConfig should be declared in this block. Otherwise the abstract member might initialize
// to null if invoked before before object creation finishing.
val recosInjectorThriftClientId = ClientId("recos-injector.staging")
val outputKafkaTopicPrefix = "staging_recos_injector"
val log = Logger("StagingConfig")
val recosInjectorCoreSvcsCacheDest = "/srv#/test/local/cache/twemcache_recos"
val recosInjectorDecider = RecosInjectorDecider(
isProd = false,
dataCenter = serviceIdentifier.zone
)
val abDeciderLoggerNode = "staging_abdecider_scribe"
} with DeployConfig

View File

@ -1,7 +0,0 @@
scala_library(
platform = "java11",
tags = ["bazel-compatible"],
dependencies = [
"decider/src/main/scala",
],
)

View File

@ -1,33 +0,0 @@
package com.twitter.recosinjector.decider
import com.twitter.decider.{Decider, DeciderFactory, RandomRecipient, Recipient}
case class RecosInjectorDecider(isProd: Boolean, dataCenter: String) {
lazy val decider: Decider = DeciderFactory(
Some("config/decider.yml"),
Some(getOverlayPath(isProd, dataCenter))
)()
private def getOverlayPath(isProd: Boolean, dataCenter: String): String = {
if (isProd) {
s"/usr/local/config/overlays/recos-injector/recos-injector/prod/$dataCenter/decider_overlay.yml"
} else {
s"/usr/local/config/overlays/recos-injector/recos-injector/staging/$dataCenter/decider_overlay.yml"
}
}
def getDecider: Decider = decider
def isAvailable(feature: String, recipient: Option[Recipient]): Boolean = {
decider.isAvailable(feature, recipient)
}
def isAvailable(feature: String): Boolean = isAvailable(feature, Some(RandomRecipient))
}
object RecosInjectorDeciderConstants {
val TweetEventTransformerUserTweetEntityEdgesDecider =
"tweet_event_transformer_user_tweet_entity_edges"
val EnableEmitTweetEdgeFromReply = "enable_emit_tweet_edge_from_reply"
val EnableUnfavoriteEdge = "enable_unfavorite_edge"
}

View File

@ -1,23 +0,0 @@
scala_library(
platform = "java11",
strict_deps = False,
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/com/twitter/storehaus:core",
"finagle/finagle-stats",
"frigate/frigate-common/src/main/scala/com/twitter/frigate/common/store",
"hermit/hermit-core/src/main/scala/com/twitter/hermit/predicate/socialgraph",
"recos-injector/server/src/main/scala/com/twitter/recosinjector/clients",
"recos-injector/server/src/main/scala/com/twitter/recosinjector/decider",
"recos-injector/server/src/main/scala/com/twitter/recosinjector/filters",
"recos-injector/server/src/main/scala/com/twitter/recosinjector/publishers",
"recos-injector/server/src/main/scala/com/twitter/recosinjector/util",
"src/scala/com/twitter/recos/util:recos-util",
"src/thrift/com/twitter/recos:recos-injector-scala",
"src/thrift/com/twitter/recos:recos-internal-scala",
"src/thrift/com/twitter/socialgraph:thrift-scala",
"src/thrift/com/twitter/timelineservice/server/internal:thrift-scala",
"src/thrift/com/twitter/tweetypie:events-scala",
"src/thrift/com/twitter/tweetypie:tweet-scala",
],
)

View File

@ -1,87 +0,0 @@
package com.twitter.recosinjector.edges
import com.twitter.recos.internal.thriftscala.RecosHoseMessage
import com.twitter.recos.recos_injector.thriftscala.{Features, UserTweetAuthorGraphMessage}
import com.twitter.recos.util.Action.Action
import com.twitter.recosinjector.util.TweetDetails
import scala.collection.Map
trait Edge {
// RecosHoseMessage is the thrift struct that the graphs consume.
def convertToRecosHoseMessage: RecosHoseMessage
// UserTweetAuthorGraphMessage is the thrift struct that user_tweet_author_graph consumes.
def convertToUserTweetAuthorGraphMessage: UserTweetAuthorGraphMessage
}
/**
* Edge corresponding to UserTweetEntityEdge.
* It captures user-tweet interactions: Create, Like, Retweet, Reply etc.
*/
case class UserTweetEntityEdge(
sourceUser: Long,
targetTweet: Long,
action: Action,
cardInfo: Option[Byte],
metadata: Option[Long],
entitiesMap: Option[Map[Byte, Seq[Int]]],
tweetDetails: Option[TweetDetails])
extends Edge {
override def convertToRecosHoseMessage: RecosHoseMessage = {
RecosHoseMessage(
leftId = sourceUser,
rightId = targetTweet,
action = action.id.toByte,
card = cardInfo,
entities = entitiesMap,
edgeMetadata = metadata
)
}
private def getFeatures(tweetDetails: TweetDetails): Features = {
Features(
hasPhoto = Some(tweetDetails.hasPhoto),
hasVideo = Some(tweetDetails.hasVideo),
hasUrl = Some(tweetDetails.hasUrl),
hasHashtag = Some(tweetDetails.hasHashtag)
)
}
override def convertToUserTweetAuthorGraphMessage: UserTweetAuthorGraphMessage = {
UserTweetAuthorGraphMessage(
leftId = sourceUser,
rightId = targetTweet,
action = action.id.toByte,
card = cardInfo,
authorId = tweetDetails.flatMap(_.authorId),
features = tweetDetails.map(getFeatures)
)
}
}
/**
* Edge corresponding to UserUserGraph.
* It captures user-user interactions: Follow, Mention, Mediatag.
*/
case class UserUserEdge(
sourceUser: Long,
targetUser: Long,
action: Action,
metadata: Option[Long])
extends Edge {
override def convertToRecosHoseMessage: RecosHoseMessage = {
RecosHoseMessage(
leftId = sourceUser,
rightId = targetUser,
action = action.id.toByte,
edgeMetadata = metadata
)
}
override def convertToUserTweetAuthorGraphMessage: UserTweetAuthorGraphMessage = {
throw new RuntimeException(
"convertToUserTweetAuthorGraphMessage not implemented in UserUserEdge.")
}
}

View File

@ -1,82 +0,0 @@
package com.twitter.recosinjector.edges
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.frigate.common.base.Stats.track
import com.twitter.util.Future
/**
* This is the generic interface that converts incoming Events (ex. TweetEvent, FavEvent, etc)
* into Edge for a specific output graph. It applies the following flow:
*
* event -> update event stats -> build edges -> filter edges
*
* Top-level statistics are provided for each step, such as latency and number of events
*/
trait EventToMessageBuilder[Event, E <: Edge] {
implicit val statsReceiver: StatsReceiver
private lazy val processEventStats = statsReceiver.scope("process_event")
private lazy val numEventsStats = statsReceiver.counter("num_process_event")
private lazy val rejectEventStats = statsReceiver.counter("num_reject_event")
private lazy val buildEdgesStats = statsReceiver.scope("build")
private lazy val numAllEdgesStats = buildEdgesStats.counter("num_all_edges")
private lazy val filterEdgesStats = statsReceiver.scope("filter")
private lazy val numValidEdgesStats = statsReceiver.counter("num_valid_edges")
private lazy val numRecosHoseMessageStats = statsReceiver.counter("num_RecosHoseMessage")
/**
* Given an incoming event, process and convert it into a sequence of RecosHoseMessages
* @param event
* @return
*/
def processEvent(event: Event): Future[Seq[Edge]] = {
track(processEventStats) {
shouldProcessEvent(event).flatMap {
case true =>
numEventsStats.incr()
updateEventStatus(event)
for {
allEdges <- track(buildEdgesStats)(buildEdges(event))
filteredEdges <- track(filterEdgesStats)(filterEdges(event, allEdges))
} yield {
numAllEdgesStats.incr(allEdges.size)
numValidEdgesStats.incr(filteredEdges.size)
numRecosHoseMessageStats.incr(filteredEdges.size)
filteredEdges
}
case false =>
rejectEventStats.incr()
Future.Nil
}
}
}
/**
* Pre-process filter that determines whether the given event should be used to build edges.
* @param event
* @return
*/
def shouldProcessEvent(event: Event): Future[Boolean]
/**
* Update cache/event logging related to the specific event.
* By default, no action will be taken. Override when necessary
* @param event
*/
def updateEventStatus(event: Event): Unit = {}
/**
* Given an event, extract info and build a sequence of edges
* @param event
* @return
*/
def buildEdges(event: Event): Future[Seq[E]]
/**
* Given a sequence of edges, filter and return the valid edges
* @param event
* @param edges
* @return
*/
def filterEdges(event: Event, edges: Seq[E]): Future[Seq[E]]
}

View File

@ -1,73 +0,0 @@
package com.twitter.recosinjector.edges
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.recos.util.Action
import com.twitter.socialgraph.thriftscala.{
Action => SocialGraphAction,
FollowGraphEvent,
FollowType,
WriteEvent
}
import com.twitter.util.Future
/**
* Converts a WriteEvent to UserUserGraph's messages, including Mention and Mediatag messages
*/
class SocialWriteEventToUserUserGraphBuilder()(override implicit val statsReceiver: StatsReceiver)
extends EventToMessageBuilder[WriteEvent, UserUserEdge] {
private val followOrFrictionlessFollowCounter =
statsReceiver.counter("num_follow_or_frictionless")
private val notFollowOrFrictionlessFollowCounter =
statsReceiver.counter("num_not_follow_or_frictionless")
private val followEdgeCounter = statsReceiver.counter("num_follow_edge")
/**
* For now, we are only interested in Follow events
*/
override def shouldProcessEvent(event: WriteEvent): Future[Boolean] = {
event.action match {
case SocialGraphAction.Follow | SocialGraphAction.FrictionlessFollow =>
followOrFrictionlessFollowCounter.incr()
Future(true)
case _ =>
notFollowOrFrictionlessFollowCounter.incr()
Future(false)
}
}
/**
* Determine whether a Follow event is valid/error free.
*/
private def isValidFollowEvent(followEvent: FollowGraphEvent): Boolean = {
followEvent.followType match {
case Some(FollowType.NormalFollow) | Some(FollowType.FrictionlessFollow) =>
followEvent.result.validationError.isEmpty
case _ =>
false
}
}
override def buildEdges(event: WriteEvent): Future[Seq[UserUserEdge]] = {
val userUserEdges = event.follow
.map(_.collect {
case followEvent if isValidFollowEvent(followEvent) =>
val sourceUserId = followEvent.result.request.source
val targetUserId = followEvent.result.request.target
followEdgeCounter.incr()
UserUserEdge(
sourceUserId,
targetUserId,
Action.Follow,
Some(System.currentTimeMillis())
)
}).getOrElse(Nil)
Future(userUserEdges)
}
override def filterEdges(
event: WriteEvent,
edges: Seq[UserUserEdge]
): Future[Seq[UserUserEdge]] = {
Future(edges)
}
}

View File

@ -1,60 +0,0 @@
package com.twitter.recosinjector.edges
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.recos.util.Action
import com.twitter.recosinjector.util.TweetFavoriteEventDetails
import com.twitter.util.Future
class TimelineEventToUserTweetEntityGraphBuilder(
userTweetEntityEdgeBuilder: UserTweetEntityEdgeBuilder
)(
override implicit val statsReceiver: StatsReceiver)
extends EventToMessageBuilder[TweetFavoriteEventDetails, UserTweetEntityEdge] {
private val numFavEdgeCounter = statsReceiver.counter("num_favorite_edge")
private val numUnfavEdgeCounter = statsReceiver.counter("num_unfavorite_edge")
override def shouldProcessEvent(event: TweetFavoriteEventDetails): Future[Boolean] = {
Future(true)
}
override def buildEdges(details: TweetFavoriteEventDetails): Future[Seq[UserTweetEntityEdge]] = {
val engagement = details.userTweetEngagement
val tweetDetails = engagement.tweetDetails
val entitiesMapFut = userTweetEntityEdgeBuilder.getEntitiesMapAndUpdateCache(
tweetId = engagement.tweetId,
tweetDetails = tweetDetails
)
entitiesMapFut
.map { entitiesMap =>
UserTweetEntityEdge(
sourceUser = engagement.engageUserId,
targetTweet = engagement.tweetId,
action = engagement.action,
metadata = engagement.engagementTimeMillis,
cardInfo = engagement.tweetDetails.map(_.cardInfo.toByte),
entitiesMap = entitiesMap,
tweetDetails = tweetDetails
)
}
.map { edge =>
edge match {
case fav if fav.action == Action.Favorite =>
numFavEdgeCounter.incr()
case unfav if unfav.action == Action.Unfavorite =>
numUnfavEdgeCounter.incr()
case _ =>
}
Seq(edge)
}
}
override def filterEdges(
event: TweetFavoriteEventDetails,
edges: Seq[UserTweetEntityEdge]
): Future[Seq[UserTweetEntityEdge]] = {
Future(edges)
}
}

View File

@ -1,54 +0,0 @@
package com.twitter.recosinjector.edges
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.recos.util.Action
import com.twitter.recosinjector.util.TweetFavoriteEventDetails
import com.twitter.util.Future
class TimelineEventToUserTweetGraphBuilder(
userTweetEntityEdgeBuilder: UserTweetEntityEdgeBuilder
)(
override implicit val statsReceiver: StatsReceiver)
extends EventToMessageBuilder[TweetFavoriteEventDetails, UserTweetEntityEdge] {
override def shouldProcessEvent(event: TweetFavoriteEventDetails): Future[Boolean] = {
Future(true)
}
override def buildEdges(details: TweetFavoriteEventDetails): Future[Seq[UserTweetEntityEdge]] = {
val engagement = details.userTweetEngagement
engagement.action match {
case Action.Favorite =>
val tweetDetails = engagement.tweetDetails
val entitiesMapFut = userTweetEntityEdgeBuilder.getEntitiesMapAndUpdateCache(
tweetId = engagement.tweetId,
tweetDetails = tweetDetails
)
entitiesMapFut
.map { entitiesMap =>
UserTweetEntityEdge(
sourceUser = engagement.engageUserId,
targetTweet = engagement.tweetId,
action = engagement.action,
metadata = engagement.engagementTimeMillis,
cardInfo = engagement.tweetDetails.map(_.cardInfo.toByte),
entitiesMap = entitiesMap,
tweetDetails = tweetDetails
)
}
.map(Seq(_))
case _ => Future.Nil
}
}
override def filterEdges(
event: TweetFavoriteEventDetails,
edges: Seq[UserTweetEntityEdge]
): Future[Seq[UserTweetEntityEdge]] = {
Future(edges)
}
}

View File

@ -1,343 +0,0 @@
package com.twitter.recosinjector.edges
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.frigate.common.store.TweetCreationTimeMHStore
import com.twitter.frigate.common.util.SnowflakeUtils
import com.twitter.recos.internal.thriftscala.{RecosUserTweetInfo, TweetType}
import com.twitter.recos.util.Action
import com.twitter.recosinjector.decider.RecosInjectorDecider
import com.twitter.recosinjector.decider.RecosInjectorDeciderConstants
import com.twitter.recosinjector.util.TweetCreateEventDetails
import com.twitter.util.{Future, Time}
class TweetEventToUserTweetEntityGraphBuilder(
userTweetEntityEdgeBuilder: UserTweetEntityEdgeBuilder,
tweetCreationStore: TweetCreationTimeMHStore,
decider: RecosInjectorDecider
)(
override implicit val statsReceiver: StatsReceiver)
extends EventToMessageBuilder[TweetCreateEventDetails, UserTweetEntityEdge] {
// TweetCreationStore counters
private val lastTweetTimeNotInMh = statsReceiver.counter("last_tweet_time_not_in_mh")
private val tweetCreationStoreInserts = statsReceiver.counter("tweet_creation_store_inserts")
private val numInvalidActionCounter = statsReceiver.counter("num_invalid_tweet_action")
private val numTweetEdgesCounter = statsReceiver.counter("num_tweet_edge")
private val numRetweetEdgesCounter = statsReceiver.counter("num_retweet_edge")
private val numReplyEdgesCounter = statsReceiver.counter("num_reply_edge")
private val numQuoteEdgesCounter = statsReceiver.counter("num_quote_edge")
private val numIsMentionedEdgesCounter = statsReceiver.counter("num_isMentioned_edge")
private val numIsMediataggedEdgesCounter = statsReceiver.counter("num_isMediatagged_edge")
private val numIsDecider = statsReceiver.counter("num_decider_enabled")
private val numIsNotDecider = statsReceiver.counter("num_decider_not_enabled")
override def shouldProcessEvent(event: TweetCreateEventDetails): Future[Boolean] = {
val isDecider = decider.isAvailable(
RecosInjectorDeciderConstants.TweetEventTransformerUserTweetEntityEdgesDecider
)
if (isDecider) {
numIsDecider.incr()
Future(true)
} else {
numIsNotDecider.incr()
Future(false)
}
}
/**
* Build edges Reply event. Reply event emits 2 edges:
* author -> Reply -> SourceTweetId
* author -> Tweet -> ReplyId
* Do not associate entities in reply tweet to the source tweet
*/
private def buildReplyEdge(event: TweetCreateEventDetails) = {
val userTweetEngagement = event.userTweetEngagement
val authorId = userTweetEngagement.engageUserId
val replyEdgeFut = event.sourceTweetDetails
.map { sourceTweetDetails =>
val sourceTweetId = sourceTweetDetails.tweet.id
val sourceTweetEntitiesMapFut = userTweetEntityEdgeBuilder.getEntitiesMapAndUpdateCache(
tweetId = sourceTweetId,
tweetDetails = Some(sourceTweetDetails)
)
sourceTweetEntitiesMapFut.map { sourceTweetEntitiesMap =>
val replyEdge = UserTweetEntityEdge(
sourceUser = authorId,
targetTweet = sourceTweetId,
action = Action.Reply,
metadata = Some(userTweetEngagement.tweetId),
cardInfo = Some(sourceTweetDetails.cardInfo.toByte),
entitiesMap = sourceTweetEntitiesMap,
tweetDetails = Some(sourceTweetDetails)
)
numReplyEdgesCounter.incr()
Some(replyEdge)
}
}.getOrElse(Future.None)
val tweetCreationEdgeFut =
if (decider.isAvailable(RecosInjectorDeciderConstants.EnableEmitTweetEdgeFromReply)) {
getAndUpdateLastTweetCreationTime(
authorId = authorId,
tweetId = userTweetEngagement.tweetId,
tweetType = TweetType.Reply
).map { lastTweetTime =>
val edge = UserTweetEntityEdge(
sourceUser = authorId,
targetTweet = userTweetEngagement.tweetId,
action = Action.Tweet,
metadata = lastTweetTime,
cardInfo = userTweetEngagement.tweetDetails.map(_.cardInfo.toByte),
entitiesMap = None,
tweetDetails = userTweetEngagement.tweetDetails
)
numTweetEdgesCounter.incr()
Some(edge)
}
} else {
Future.None
}
Future.join(replyEdgeFut, tweetCreationEdgeFut).map {
case (replyEdgeOpt, tweetCreationEdgeOpt) =>
tweetCreationEdgeOpt.toSeq ++ replyEdgeOpt.toSeq
}
}
/**
* Build a Retweet UTEG edge: author -> RT -> SourceTweetId.
*/
private def buildRetweetEdge(event: TweetCreateEventDetails) = {
val userTweetEngagement = event.userTweetEngagement
val tweetId = userTweetEngagement.tweetId
event.sourceTweetDetails
.map { sourceTweetDetails =>
val sourceTweetId = sourceTweetDetails.tweet.id // Id of the tweet being Retweeted
val sourceTweetEntitiesMapFut = userTweetEntityEdgeBuilder.getEntitiesMapAndUpdateCache(
tweetId = sourceTweetId,
tweetDetails = Some(sourceTweetDetails)
)
sourceTweetEntitiesMapFut.map { sourceTweetEntitiesMap =>
val edge = UserTweetEntityEdge(
sourceUser = userTweetEngagement.engageUserId,
targetTweet = sourceTweetId,
action = Action.Retweet,
metadata = Some(tweetId), // metadata is the tweetId
cardInfo = Some(sourceTweetDetails.cardInfo.toByte),
entitiesMap = sourceTweetEntitiesMap,
tweetDetails = Some(sourceTweetDetails)
)
numRetweetEdgesCounter.incr()
Seq(edge)
}
}.getOrElse(Future.Nil)
}
/**
* Build edges for a Quote event. Quote tweet emits 2 edges:
* 1. A quote social proof: author -> Quote -> SourceTweetId
* 2. A tweet creation edge: author -> Tweet -> QuoteTweetId
*/
private def buildQuoteEdges(
event: TweetCreateEventDetails
): Future[Seq[UserTweetEntityEdge]] = {
val userTweetEngagement = event.userTweetEngagement
val tweetId = userTweetEngagement.tweetId
val authorId = userTweetEngagement.engageUserId
// do not associate entities in quote tweet to the source tweet,
// but associate entities to quote tweet in tweet creation event
val quoteTweetEdgeFut = event.sourceTweetDetails
.map { sourceTweetDetails =>
val sourceTweetId = sourceTweetDetails.tweet.id // Id of the tweet being quoted
val sourceTweetEntitiesMapFut = userTweetEntityEdgeBuilder.getEntitiesMapAndUpdateCache(
tweetId = sourceTweetId,
tweetDetails = event.sourceTweetDetails
)
sourceTweetEntitiesMapFut.map { sourceTweetEntitiesMap =>
val edge = UserTweetEntityEdge(
sourceUser = authorId,
targetTweet = sourceTweetId,
action = Action.Quote,
metadata = Some(tweetId), // metadata is tweetId
cardInfo = Some(sourceTweetDetails.cardInfo.toByte), // cardInfo of the source tweet
entitiesMap = sourceTweetEntitiesMap,
tweetDetails = Some(sourceTweetDetails)
)
numQuoteEdgesCounter.incr()
Seq(edge)
}
}.getOrElse(Future.Nil)
val tweetCreationEdgeFut = getAndUpdateLastTweetCreationTime(
authorId = authorId,
tweetId = tweetId,
tweetType = TweetType.Quote
).map { lastTweetTime =>
val metadata = lastTweetTime
val cardInfo = userTweetEngagement.tweetDetails.map(_.cardInfo.toByte)
val edge = UserTweetEntityEdge(
sourceUser = authorId,
targetTweet = tweetId,
action = Action.Tweet,
metadata = metadata,
cardInfo = cardInfo,
entitiesMap = None,
tweetDetails = userTweetEngagement.tweetDetails
)
numTweetEdgesCounter.incr()
Seq(edge)
}
Future.join(quoteTweetEdgeFut, tweetCreationEdgeFut).map {
case (quoteEdge, creationEdge) =>
quoteEdge ++ creationEdge
}
}
/**
* Build edges for a Tweet event. A Tweet emits 3 tyes edges:
* 1. A tweet creation edge: author -> Tweet -> TweetId
* 2. IsMentioned edges: mentionedUserId -> IsMentioned -> TweetId
* 3. IsMediatagged edges: mediataggedUserId -> IsMediatagged -> TweetId
*/
private def buildTweetEdges(event: TweetCreateEventDetails): Future[Seq[UserTweetEntityEdge]] = {
val userTweetEngagement = event.userTweetEngagement
val tweetDetails = userTweetEngagement.tweetDetails
val tweetId = userTweetEngagement.tweetId
val authorId = userTweetEngagement.engageUserId
val cardInfo = tweetDetails.map(_.cardInfo.toByte)
val entitiesMapFut = userTweetEntityEdgeBuilder.getEntitiesMapAndUpdateCache(
tweetId = tweetId,
tweetDetails = tweetDetails
)
val lastTweetTimeFut = getAndUpdateLastTweetCreationTime(
authorId = authorId,
tweetId = tweetId,
tweetType = TweetType.Tweet
)
Future.join(entitiesMapFut, lastTweetTimeFut).map {
case (entitiesMap, lastTweetTime) =>
val tweetCreationEdge = UserTweetEntityEdge(
sourceUser = authorId,
targetTweet = tweetId,
action = Action.Tweet,
metadata = lastTweetTime,
cardInfo = cardInfo,
entitiesMap = entitiesMap,
tweetDetails = userTweetEngagement.tweetDetails
)
numTweetEdgesCounter.incr()
val isMentionedEdges = event.validMentionUserIds
.map(_.map { mentionedUserId =>
UserTweetEntityEdge(
sourceUser = mentionedUserId,
targetTweet = tweetId,
action = Action.IsMentioned,
metadata = Some(tweetId),
cardInfo = cardInfo,
entitiesMap = entitiesMap,
tweetDetails = userTweetEngagement.tweetDetails
)
}).getOrElse(Nil)
numIsMentionedEdgesCounter.incr(isMentionedEdges.size)
val isMediataggedEdges = event.validMediatagUserIds
.map(_.map { mediataggedUserId =>
UserTweetEntityEdge(
sourceUser = mediataggedUserId,
targetTweet = tweetId,
action = Action.IsMediaTagged,
metadata = Some(tweetId),
cardInfo = cardInfo,
entitiesMap = entitiesMap,
tweetDetails = userTweetEngagement.tweetDetails
)
}).getOrElse(Nil)
numIsMediataggedEdgesCounter.incr(isMediataggedEdges.size)
Seq(tweetCreationEdge) ++ isMentionedEdges ++ isMediataggedEdges
}
}
/**
* For a given user, read the user's last time tweeted from the MH store, and
* write the new tweet time into the MH store before returning.
* Note this function is async, so the MH write operations will continue to execute on its own.
* This might create a read/write race condition, but it's expected.
*/
private def getAndUpdateLastTweetCreationTime(
authorId: Long,
tweetId: Long,
tweetType: TweetType
): Future[Option[Long]] = {
val newTweetInfo = RecosUserTweetInfo(
authorId,
tweetId,
tweetType,
SnowflakeUtils.tweetCreationTime(tweetId).map(_.inMillis).getOrElse(Time.now.inMillis)
)
tweetCreationStore
.get(authorId)
.map(_.map { previousTweetInfoSeq =>
val lastTweetTime = previousTweetInfoSeq
.filter(info => info.tweetType == TweetType.Tweet || info.tweetType == TweetType.Quote)
.map(_.tweetTimestamp)
.sortBy(-_)
.headOption // Fetch the latest time user Tweeted or Quoted
.getOrElse(
Time.Bottom.inMillis
) // Last tweet time never recorded in MH, default to oldest point in time
if (lastTweetTime == Time.Bottom.inMillis) lastTweetTimeNotInMh.incr()
lastTweetTime
})
.ensure {
tweetCreationStore
.put(authorId, newTweetInfo)
.onSuccess(_ => tweetCreationStoreInserts.incr())
.onFailure { e =>
statsReceiver.counter("write_failed_with_ex:" + e.getClass.getName).incr()
}
}
}
override def buildEdges(event: TweetCreateEventDetails): Future[Seq[UserTweetEntityEdge]] = {
val userTweetEngagement = event.userTweetEngagement
userTweetEngagement.action match {
case Action.Reply =>
buildReplyEdge(event)
case Action.Retweet =>
buildRetweetEdge(event)
case Action.Tweet =>
buildTweetEdges(event)
case Action.Quote =>
buildQuoteEdges(event)
case _ =>
numInvalidActionCounter.incr()
Future.Nil
}
}
override def filterEdges(
event: TweetCreateEventDetails,
edges: Seq[UserTweetEntityEdge]
): Future[Seq[UserTweetEntityEdge]] = {
Future(edges) // No filtering for now. Add more if needed
}
}

View File

@ -1,88 +0,0 @@
package com.twitter.recosinjector.edges
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.frigate.common.store.TweetCreationTimeMHStore
import com.twitter.frigate.common.util.SnowflakeUtils
import com.twitter.recos.internal.thriftscala.RecosUserTweetInfo
import com.twitter.recos.internal.thriftscala.TweetType
import com.twitter.recos.util.Action
import com.twitter.recosinjector.decider.RecosInjectorDecider
import com.twitter.recosinjector.decider.RecosInjectorDeciderConstants
import com.twitter.recosinjector.util.TweetCreateEventDetails
import com.twitter.util.Future
import com.twitter.util.Time
class TweetEventToUserTweetGraphBuilder(
userTweetEntityEdgeBuilder: UserTweetEntityEdgeBuilder,
tweetCreationStore: TweetCreationTimeMHStore,
decider: RecosInjectorDecider
)(
override implicit val statsReceiver: StatsReceiver)
extends EventToMessageBuilder[TweetCreateEventDetails, UserTweetEntityEdge] {
private val numRetweetEdgesCounter = statsReceiver.counter("num_retweet_edge")
private val numIsDecider = statsReceiver.counter("num_decider_enabled")
private val numIsNotDecider = statsReceiver.counter("num_decider_not_enabled")
override def shouldProcessEvent(event: TweetCreateEventDetails): Future[Boolean] = {
val isDecider = decider.isAvailable(
RecosInjectorDeciderConstants.TweetEventTransformerUserTweetEntityEdgesDecider
)
if (isDecider) {
numIsDecider.incr()
Future(true)
} else {
numIsNotDecider.incr()
Future(false)
}
}
/**
* Build a Retweet edge: author -> RT -> SourceTweetId.
*/
private def buildRetweetEdge(event: TweetCreateEventDetails) = {
val userTweetEngagement = event.userTweetEngagement
val tweetId = userTweetEngagement.tweetId
event.sourceTweetDetails
.map { sourceTweetDetails =>
val sourceTweetId = sourceTweetDetails.tweet.id // Id of the tweet being Retweeted
val sourceTweetEntitiesMapFut = userTweetEntityEdgeBuilder.getEntitiesMapAndUpdateCache(
tweetId = sourceTweetId,
tweetDetails = Some(sourceTweetDetails)
)
sourceTweetEntitiesMapFut.map { sourceTweetEntitiesMap =>
val edge = UserTweetEntityEdge(
sourceUser = userTweetEngagement.engageUserId,
targetTweet = sourceTweetId,
action = Action.Retweet,
metadata = Some(tweetId), // metadata is the tweetId
cardInfo = Some(sourceTweetDetails.cardInfo.toByte),
entitiesMap = sourceTweetEntitiesMap,
tweetDetails = Some(sourceTweetDetails)
)
numRetweetEdgesCounter.incr()
Seq(edge)
}
}.getOrElse(Future.Nil)
}
override def buildEdges(event: TweetCreateEventDetails): Future[Seq[UserTweetEntityEdge]] = {
val userTweetEngagement = event.userTweetEngagement
userTweetEngagement.action match {
case Action.Retweet =>
buildRetweetEdge(event)
case _ =>
Future.Nil
}
}
override def filterEdges(
event: TweetCreateEventDetails,
edges: Seq[UserTweetEntityEdge]
): Future[Seq[UserTweetEntityEdge]] = {
Future(edges) // No filtering for now. Add more if needed
}
}

View File

@ -1,65 +0,0 @@
package com.twitter.recosinjector.edges
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.recos.util.Action
import com.twitter.recosinjector.util.TweetCreateEventDetails
import com.twitter.util.Future
/**
* Given a tweet creation event, parse for UserUserGraph edges. Specifically, when a new tweet is
* created, extract the valid mentioned and mediatagged users in the tweet and create edges for them
*/
class TweetEventToUserUserGraphBuilder(
)(
override implicit val statsReceiver: StatsReceiver)
extends EventToMessageBuilder[TweetCreateEventDetails, UserUserEdge] {
private val tweetOrQuoteEventCounter = statsReceiver.counter("num_tweet_or_quote_event")
private val nonTweetOrQuoteEventCounter = statsReceiver.counter("num_non_tweet_or_quote_event")
private val mentionEdgeCounter = statsReceiver.counter("num_mention_edge")
private val mediatagEdgeCounter = statsReceiver.counter("num_mediatag_edge")
override def shouldProcessEvent(event: TweetCreateEventDetails): Future[Boolean] = {
// For user interactions, only new tweets and quotes are considered (no replies or retweets)
event.userTweetEngagement.action match {
case Action.Tweet | Action.Quote =>
tweetOrQuoteEventCounter.incr()
Future(true)
case _ =>
nonTweetOrQuoteEventCounter.incr()
Future(false)
}
}
override def buildEdges(event: TweetCreateEventDetails): Future[Seq[UserUserEdge]] = {
val mentionEdges = event.validMentionUserIds
.map(_.map { mentionUserId =>
UserUserEdge(
sourceUser = event.userTweetEngagement.engageUserId,
targetUser = mentionUserId,
action = Action.Mention,
metadata = Some(System.currentTimeMillis())
)
}).getOrElse(Nil)
val mediatagEdges = event.validMediatagUserIds
.map(_.map { mediatagUserId =>
UserUserEdge(
sourceUser = event.userTweetEngagement.engageUserId,
targetUser = mediatagUserId,
action = Action.MediaTag,
metadata = Some(System.currentTimeMillis())
)
}).getOrElse(Nil)
mentionEdgeCounter.incr(mentionEdges.size)
mediatagEdgeCounter.incr(mediatagEdges.size)
Future(mentionEdges ++ mediatagEdges)
}
override def filterEdges(
event: TweetCreateEventDetails,
edges: Seq[UserUserEdge]
): Future[Seq[UserUserEdge]] = {
Future(edges)
}
}

View File

@ -1,44 +0,0 @@
package com.twitter.recosinjector.edges
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.recos.util.Action
import com.twitter.recosinjector.util.UuaEngagementEventDetails
import com.twitter.util.Future
class UnifiedUserActionToUserAdGraphBuilder(
userTweetEntityEdgeBuilder: UserTweetEntityEdgeBuilder
)(
override implicit val statsReceiver: StatsReceiver)
extends EventToMessageBuilder[UuaEngagementEventDetails, UserTweetEntityEdge] {
override def shouldProcessEvent(event: UuaEngagementEventDetails): Future[Boolean] = {
event.userTweetEngagement.action match {
case Action.Click | Action.VideoPlayback75 | Action.Favorite => Future(true)
case _ => Future(false)
}
}
override def buildEdges(details: UuaEngagementEventDetails): Future[Seq[UserTweetEntityEdge]] = {
val engagement = details.userTweetEngagement
val tweetDetails = engagement.tweetDetails
Future.value(
Seq(
UserTweetEntityEdge(
sourceUser = engagement.engageUserId,
targetTweet = engagement.tweetId,
action = engagement.action,
metadata = engagement.engagementTimeMillis,
cardInfo = engagement.tweetDetails.map(_.cardInfo.toByte),
entitiesMap = None,
tweetDetails = tweetDetails
)))
}
override def filterEdges(
event: UuaEngagementEventDetails,
edges: Seq[UserTweetEntityEdge]
): Future[Seq[UserTweetEntityEdge]] = {
Future(edges)
}
}

View File

@ -1,51 +0,0 @@
package com.twitter.recosinjector.edges
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.recos.util.Action
import com.twitter.recosinjector.util.UuaEngagementEventDetails
import com.twitter.util.Future
class UnifiedUserActionToUserTweetGraphPlusBuilder(
userTweetEntityEdgeBuilder: UserTweetEntityEdgeBuilder
)(
override implicit val statsReceiver: StatsReceiver)
extends EventToMessageBuilder[UuaEngagementEventDetails, UserTweetEntityEdge] {
override def shouldProcessEvent(event: UuaEngagementEventDetails): Future[Boolean] = {
event.userTweetEngagement.action match {
case Action.Click | Action.VideoQualityView => Future(true)
case Action.Favorite | Action.Retweet | Action.Share => Future(true)
case Action.NotificationOpen | Action.EmailClick => Future(true)
case Action.Quote | Action.Reply => Future(true)
case Action.TweetNotInterestedIn | Action.TweetNotRelevant | Action.TweetSeeFewer |
Action.TweetReport | Action.TweetMuteAuthor | Action.TweetBlockAuthor =>
Future(true)
case _ => Future(false)
}
}
override def buildEdges(details: UuaEngagementEventDetails): Future[Seq[UserTweetEntityEdge]] = {
val engagement = details.userTweetEngagement
val tweetDetails = engagement.tweetDetails
Future
.value(
UserTweetEntityEdge(
sourceUser = engagement.engageUserId,
targetTweet = engagement.tweetId,
action = engagement.action,
metadata = engagement.engagementTimeMillis,
cardInfo = engagement.tweetDetails.map(_.cardInfo.toByte),
entitiesMap = None,
tweetDetails = tweetDetails
)
).map(Seq(_))
}
override def filterEdges(
event: UuaEngagementEventDetails,
edges: Seq[UserTweetEntityEdge]
): Future[Seq[UserTweetEntityEdge]] = {
Future(edges)
}
}

View File

@ -1,56 +0,0 @@
package com.twitter.recosinjector.edges
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.recos.util.Action
import com.twitter.recosinjector.util.UuaEngagementEventDetails
import com.twitter.util.Future
class UnifiedUserActionToUserVideoGraphBuilder(
userTweetEntityEdgeBuilder: UserTweetEntityEdgeBuilder
)(
override implicit val statsReceiver: StatsReceiver)
extends EventToMessageBuilder[UuaEngagementEventDetails, UserTweetEntityEdge] {
private val numVideoPlayback50EdgeCounter = statsReceiver.counter("num_video_playback50_edge")
private val numUnVideoPlayback50Counter = statsReceiver.counter("num_non_video_playback50_edge")
override def shouldProcessEvent(event: UuaEngagementEventDetails): Future[Boolean] = {
event.userTweetEngagement.action match {
case Action.VideoPlayback50 => Future(true)
case _ => Future(false)
}
}
override def buildEdges(details: UuaEngagementEventDetails): Future[Seq[UserTweetEntityEdge]] = {
val engagement = details.userTweetEngagement
val tweetDetails = engagement.tweetDetails
Future
.value(
UserTweetEntityEdge(
sourceUser = engagement.engageUserId,
targetTweet = engagement.tweetId,
action = engagement.action,
metadata = engagement.engagementTimeMillis,
cardInfo = engagement.tweetDetails.map(_.cardInfo.toByte),
entitiesMap = None,
tweetDetails = tweetDetails
)
).map { edge =>
edge match {
case videoPlayback50 if videoPlayback50.action == Action.VideoPlayback50 =>
numVideoPlayback50EdgeCounter.incr()
case _ =>
numUnVideoPlayback50Counter.incr()
}
Seq(edge)
}
}
override def filterEdges(
event: UuaEngagementEventDetails,
edges: Seq[UserTweetEntityEdge]
): Future[Seq[UserTweetEntityEdge]] = {
Future(edges)
}
}

View File

@ -1,80 +0,0 @@
package com.twitter.recosinjector.edges
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.graphjet.algorithms.RecommendationType
import com.twitter.recosinjector.clients.CacheEntityEntry
import com.twitter.recosinjector.clients.RecosHoseEntitiesCache
import com.twitter.recosinjector.clients.UrlResolver
import com.twitter.recosinjector.util.TweetDetails
import com.twitter.util.Future
import scala.collection.Map
import scala.util.hashing.MurmurHash3
class UserTweetEntityEdgeBuilder(
cache: RecosHoseEntitiesCache,
urlResolver: UrlResolver
)(
implicit val stats: StatsReceiver) {
def getHashedEntities(entities: Seq[String]): Seq[Int] = {
entities.map(MurmurHash3.stringHash)
}
/**
* Given the entities and their corresponding hashedIds, store the hashId->entity mapping into a
* cache.
* This is because UTEG edges only store the hashIds, and relies on the cache values to
* recover the actual entities. This allows us to store integer values instead of string in the
* edges to save space.
*/
private def storeEntitiesInCache(
urlEntities: Seq[String],
urlHashIds: Seq[Int]
): Future[Unit] = {
val urlCacheEntries = urlHashIds.zip(urlEntities).map {
case (hashId, url) =>
CacheEntityEntry(RecosHoseEntitiesCache.UrlPrefix, hashId, url)
}
cache.updateEntitiesCache(
newCacheEntries = urlCacheEntries,
stats = stats.scope("urlCache")
)
}
/**
* Return an entity mapping from GraphJet recType -> hash(entity)
*/
private def getEntitiesMap(
urlHashIds: Seq[Int]
) = {
val entitiesMap = Seq(
RecommendationType.URL.getValue.toByte -> urlHashIds
).collect {
case (keys, ids) if ids.nonEmpty => keys -> ids
}.toMap
if (entitiesMap.isEmpty) None else Some(entitiesMap)
}
def getEntitiesMapAndUpdateCache(
tweetId: Long,
tweetDetails: Option[TweetDetails]
): Future[Option[Map[Byte, Seq[Int]]]] = {
val resolvedUrlFut = urlResolver
.getResolvedUrls(
urls = tweetDetails.flatMap(_.urls).getOrElse(Nil),
tweetId = tweetId
).map(_.values.toSeq)
resolvedUrlFut.map { resolvedUrls =>
val urlEntities = resolvedUrls
val urlHashIds = getHashedEntities(urlEntities)
// Async call to cache
storeEntitiesInCache(
urlEntities = urlEntities,
urlHashIds = urlHashIds
)
getEntitiesMap(urlHashIds)
}
}
}

View File

@ -1,20 +0,0 @@
scala_library(
platform = "java11",
strict_deps = False,
tags = ["bazel-compatible"],
dependencies = [
"eventbus/client",
"recos-injector/server/config",
"recos-injector/server/src/main/scala/com/twitter/recosinjector/clients",
"recos-injector/server/src/main/scala/com/twitter/recosinjector/config",
"recos-injector/server/src/main/scala/com/twitter/recosinjector/decider",
"recos-injector/server/src/main/scala/com/twitter/recosinjector/edges",
"recos-injector/server/src/main/scala/com/twitter/recosinjector/publishers",
"src/thrift/com/twitter/clientapp/gen:clientapp-scala",
"src/thrift/com/twitter/gizmoduck:user-thrift-scala",
"src/thrift/com/twitter/socialgraph:thrift-scala",
"src/thrift/com/twitter/timelineservice/server/internal:thrift-scala",
"src/thrift/com/twitter/tweetypie:events-scala",
"src/thrift/com/twitter/tweetypie:tweet-scala",
],
)

View File

@ -1,60 +0,0 @@
package com.twitter.recosinjector.event_processors
import com.twitter.eventbus.client.{EventBusSubscriber, EventBusSubscriberBuilder}
import com.twitter.finagle.mtls.authentication.ServiceIdentifier
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.logging.Logger
import com.twitter.scrooge.{ThriftStruct, ThriftStructCodec}
import com.twitter.util.Future
/**
* Main processor class that handles incoming EventBus events, which take forms of a ThriftStruct.
* This class is responsible for setting up the EventBus streams, and provides a processEvent()
* where child classes can decide what to do with incoming events
*/
trait EventBusProcessor[Event <: ThriftStruct] {
private val log = Logger()
implicit def statsReceiver: StatsReceiver
/**
* Full name of the EventBus stream this processor listens to
*/
val eventBusStreamName: String
/**
* the thriftStruct definition of the objects passed in from the EventBus streams, such as
* TweetEvent, WriteEvent, etc.
*/
val thriftStruct: ThriftStructCodec[Event]
val serviceIdentifier: ServiceIdentifier
def processEvent(event: Event): Future[Unit]
private def getEventBusSubscriberBuilder: EventBusSubscriberBuilder[Event] =
EventBusSubscriberBuilder()
.subscriberId(eventBusStreamName)
.serviceIdentifier(serviceIdentifier)
.thriftStruct(thriftStruct)
.numThreads(8)
.fromAllZones(true) // Receives traffic from all data centers
.skipToLatest(false) // Ensures we don't miss out on events during restart
.statsReceiver(statsReceiver)
// lazy val ensures the subscriber is only initialized when start() is called
private lazy val eventBusSubscriber = getEventBusSubscriberBuilder.build(processEvent)
def start(): EventBusSubscriber[Event] = eventBusSubscriber
def stop(): Unit = {
eventBusSubscriber
.close()
.onSuccess { _ =>
log.info(s"EventBus processor ${this.getClass.getSimpleName} is stopped")
}
.onFailure { ex: Throwable =>
log.error(ex, s"Exception while stopping EventBus processor ${this.getClass.getSimpleName}")
}
}
}

View File

@ -1,33 +0,0 @@
package com.twitter.recosinjector.event_processors
import com.twitter.finagle.mtls.authentication.ServiceIdentifier
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.recosinjector.edges.{EventToMessageBuilder, UserUserEdge}
import com.twitter.recosinjector.publishers.KafkaEventPublisher
import com.twitter.scrooge.ThriftStructCodec
import com.twitter.socialgraph.thriftscala.WriteEvent
import com.twitter.util.Future
/**
* This processor listens to events from social graphs services. In particular, a major use case is
* to listen to user-user follow events.
*/
class SocialWriteEventProcessor(
override val eventBusStreamName: String,
override val thriftStruct: ThriftStructCodec[WriteEvent],
override val serviceIdentifier: ServiceIdentifier,
kafkaEventPublisher: KafkaEventPublisher,
userUserGraphTopic: String,
userUserGraphMessageBuilder: EventToMessageBuilder[WriteEvent, UserUserEdge]
)(
override implicit val statsReceiver: StatsReceiver)
extends EventBusProcessor[WriteEvent] {
override def processEvent(event: WriteEvent): Future[Unit] = {
userUserGraphMessageBuilder.processEvent(event).map { edges =>
edges.foreach { edge =>
kafkaEventPublisher.publish(edge.convertToRecosHoseMessage, userUserGraphTopic)
}
}
}
}

View File

@ -1,150 +0,0 @@
package com.twitter.recosinjector.event_processors
import com.twitter.finagle.mtls.authentication.ServiceIdentifier
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.recos.util.Action
import com.twitter.recosinjector.clients.Gizmoduck
import com.twitter.recosinjector.clients.Tweetypie
import com.twitter.recosinjector.decider.RecosInjectorDecider
import com.twitter.recosinjector.decider.RecosInjectorDeciderConstants
import com.twitter.recosinjector.edges.TimelineEventToUserTweetEntityGraphBuilder
import com.twitter.recosinjector.filters.TweetFilter
import com.twitter.recosinjector.filters.UserFilter
import com.twitter.recosinjector.publishers.KafkaEventPublisher
import com.twitter.recosinjector.util.TweetDetails
import com.twitter.recosinjector.util.TweetFavoriteEventDetails
import com.twitter.recosinjector.util.UserTweetEngagement
import com.twitter.scrooge.ThriftStructCodec
import com.twitter.timelineservice.thriftscala.FavoriteEvent
import com.twitter.timelineservice.thriftscala.UnfavoriteEvent
import com.twitter.timelineservice.thriftscala.{Event => TimelineEvent}
import com.twitter.util.Future
/**
* Processor for Timeline events, such as Favorite (liking) tweets
*/
class TimelineEventProcessor(
override val eventBusStreamName: String,
override val thriftStruct: ThriftStructCodec[TimelineEvent],
override val serviceIdentifier: ServiceIdentifier,
kafkaEventPublisher: KafkaEventPublisher,
userTweetEntityGraphTopic: String,
userTweetEntityGraphMessageBuilder: TimelineEventToUserTweetEntityGraphBuilder,
decider: RecosInjectorDecider,
gizmoduck: Gizmoduck,
tweetypie: Tweetypie
)(
override implicit val statsReceiver: StatsReceiver)
extends EventBusProcessor[TimelineEvent] {
private val processEventDeciderCounter = statsReceiver.counter("num_process_timeline_event")
private val numFavoriteEventCounter = statsReceiver.counter("num_favorite_event")
private val numUnFavoriteEventCounter = statsReceiver.counter("num_unfavorite_event")
private val numNotFavoriteEventCounter = statsReceiver.counter("num_not_favorite_event")
private val numSelfFavoriteCounter = statsReceiver.counter("num_self_favorite_event")
private val numNullCastTweetCounter = statsReceiver.counter("num_null_cast_tweet")
private val numTweetFailSafetyLevelCounter = statsReceiver.counter("num_fail_tweetypie_safety")
private val numFavoriteUserUnsafeCounter = statsReceiver.counter("num_favorite_user_unsafe")
private val engageUserFilter = new UserFilter(gizmoduck)(statsReceiver.scope("engage_user"))
private val tweetFilter = new TweetFilter(tweetypie)
private val numProcessFavorite = statsReceiver.counter("num_process_favorite")
private val numNoProcessFavorite = statsReceiver.counter("num_no_process_favorite")
private def getFavoriteEventDetails(
favoriteEvent: FavoriteEvent
): TweetFavoriteEventDetails = {
val engagement = UserTweetEngagement(
engageUserId = favoriteEvent.userId,
engageUser = favoriteEvent.user,
action = Action.Favorite,
engagementTimeMillis = Some(favoriteEvent.eventTimeMs),
tweetId = favoriteEvent.tweetId, // the tweet, or source tweet if target tweet is a retweet
tweetDetails = favoriteEvent.tweet.map(TweetDetails) // tweet always exists
)
TweetFavoriteEventDetails(userTweetEngagement = engagement)
}
private def getUnfavoriteEventDetails(
unfavoriteEvent: UnfavoriteEvent
): TweetFavoriteEventDetails = {
val engagement = UserTweetEngagement(
engageUserId = unfavoriteEvent.userId,
engageUser = unfavoriteEvent.user,
action = Action.Unfavorite,
engagementTimeMillis = Some(unfavoriteEvent.eventTimeMs),
tweetId = unfavoriteEvent.tweetId, // the tweet, or source tweet if target tweet is a retweet
tweetDetails = unfavoriteEvent.tweet.map(TweetDetails) // tweet always exists
)
TweetFavoriteEventDetails(userTweetEngagement = engagement)
}
private def shouldProcessFavoriteEvent(event: TweetFavoriteEventDetails): Future[Boolean] = {
val engagement = event.userTweetEngagement
val engageUserId = engagement.engageUserId
val tweetId = engagement.tweetId
val authorIdOpt = engagement.tweetDetails.flatMap(_.authorId)
val isSelfFavorite = authorIdOpt.contains(engageUserId)
val isNullCastTweet = engagement.tweetDetails.forall(_.isNullCastTweet)
val isEngageUserSafeFut = engageUserFilter.filterByUserId(engageUserId)
val isTweetPassSafetyFut = tweetFilter.filterForTweetypieSafetyLevel(tweetId)
Future.join(isEngageUserSafeFut, isTweetPassSafetyFut).map {
case (isEngageUserSafe, isTweetPassSafety) =>
if (isSelfFavorite) numSelfFavoriteCounter.incr()
if (isNullCastTweet) numNullCastTweetCounter.incr()
if (!isEngageUserSafe) numFavoriteUserUnsafeCounter.incr()
if (!isTweetPassSafety) numTweetFailSafetyLevelCounter.incr()
!isSelfFavorite && !isNullCastTweet && isEngageUserSafe && isTweetPassSafety
}
}
private def processFavoriteEvent(favoriteEvent: FavoriteEvent): Future[Unit] = {
val eventDetails = getFavoriteEventDetails(favoriteEvent)
shouldProcessFavoriteEvent(eventDetails).map {
case true =>
numProcessFavorite.incr()
// Convert the event for UserTweetEntityGraph
userTweetEntityGraphMessageBuilder.processEvent(eventDetails).map { edges =>
edges.foreach { edge =>
kafkaEventPublisher.publish(edge.convertToRecosHoseMessage, userTweetEntityGraphTopic)
}
}
case false =>
numNoProcessFavorite.incr()
}
}
private def processUnFavoriteEvent(unFavoriteEvent: UnfavoriteEvent): Future[Unit] = {
if (decider.isAvailable(RecosInjectorDeciderConstants.EnableUnfavoriteEdge)) {
val eventDetails = getUnfavoriteEventDetails(unFavoriteEvent)
// Convert the event for UserTweetEntityGraph
userTweetEntityGraphMessageBuilder.processEvent(eventDetails).map { edges =>
edges.foreach { edge =>
kafkaEventPublisher.publish(edge.convertToRecosHoseMessage, userTweetEntityGraphTopic)
}
}
} else {
Future.Unit
}
}
override def processEvent(event: TimelineEvent): Future[Unit] = {
processEventDeciderCounter.incr()
event match {
case TimelineEvent.Favorite(favoriteEvent: FavoriteEvent) =>
numFavoriteEventCounter.incr()
processFavoriteEvent(favoriteEvent)
case TimelineEvent.Unfavorite(unFavoriteEvent: UnfavoriteEvent) =>
numUnFavoriteEventCounter.incr()
processUnFavoriteEvent(unFavoriteEvent)
case _ =>
numNotFavoriteEventCounter.incr()
Future.Unit
}
}
}

View File

@ -1,256 +0,0 @@
package com.twitter.recosinjector.event_processors
import com.twitter.finagle.mtls.authentication.ServiceIdentifier
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.frigate.common.util.SnowflakeUtils
import com.twitter.gizmoduck.thriftscala.User
import com.twitter.recos.util.Action
import com.twitter.recos.util.Action.Action
import com.twitter.recosinjector.clients.Gizmoduck
import com.twitter.recosinjector.clients.SocialGraph
import com.twitter.recosinjector.clients.Tweetypie
import com.twitter.recosinjector.edges.TweetEventToUserTweetEntityGraphBuilder
import com.twitter.recosinjector.edges.TweetEventToUserUserGraphBuilder
import com.twitter.recosinjector.filters.TweetFilter
import com.twitter.recosinjector.filters.UserFilter
import com.twitter.recosinjector.publishers.KafkaEventPublisher
import com.twitter.recosinjector.util.TweetCreateEventDetails
import com.twitter.recosinjector.util.TweetDetails
import com.twitter.recosinjector.util.UserTweetEngagement
import com.twitter.scrooge.ThriftStructCodec
import com.twitter.tweetypie.thriftscala.Tweet
import com.twitter.tweetypie.thriftscala.TweetCreateEvent
import com.twitter.tweetypie.thriftscala.TweetEvent
import com.twitter.tweetypie.thriftscala.TweetEventData
import com.twitter.util.Future
/**
* Event processor for tweet_events EventBus stream from Tweetypie. This stream provides all the
* key events related to a new tweet, like Creation, Retweet, Quote Tweet, and Replying.
* It also carries the entities/metadata information in a tweet, including
* @ Mention, HashTag, MediaTag, URL, etc.
*/
class TweetEventProcessor(
override val eventBusStreamName: String,
override val thriftStruct: ThriftStructCodec[TweetEvent],
override val serviceIdentifier: ServiceIdentifier,
userUserGraphMessageBuilder: TweetEventToUserUserGraphBuilder,
userUserGraphTopic: String,
userTweetEntityGraphMessageBuilder: TweetEventToUserTweetEntityGraphBuilder,
userTweetEntityGraphTopic: String,
kafkaEventPublisher: KafkaEventPublisher,
socialGraph: SocialGraph,
gizmoduck: Gizmoduck,
tweetypie: Tweetypie
)(
override implicit val statsReceiver: StatsReceiver)
extends EventBusProcessor[TweetEvent] {
private val tweetCreateEventCounter = statsReceiver.counter("num_tweet_create_events")
private val nonTweetCreateEventCounter = statsReceiver.counter("num_non_tweet_create_events")
private val tweetActionStats = statsReceiver.scope("tweet_action")
private val numUrlCounter = statsReceiver.counter("num_tweet_url")
private val numMediaUrlCounter = statsReceiver.counter("num_tweet_media_url")
private val numHashTagCounter = statsReceiver.counter("num_tweet_hashtag")
private val numMentionsCounter = statsReceiver.counter("num_tweet_mention")
private val numMediatagCounter = statsReceiver.counter("num_tweet_mediatag")
private val numValidMentionsCounter = statsReceiver.counter("num_tweet_valid_mention")
private val numValidMediatagCounter = statsReceiver.counter("num_tweet_valid_mediatag")
private val numNullCastTweetCounter = statsReceiver.counter("num_null_cast_tweet")
private val numNullCastSourceTweetCounter = statsReceiver.counter("num_null_cast_source_tweet")
private val numTweetFailSafetyLevelCounter = statsReceiver.counter("num_fail_tweetypie_safety")
private val numAuthorUnsafeCounter = statsReceiver.counter("num_author_unsafe")
private val numProcessTweetCounter = statsReceiver.counter("num_process_tweet")
private val numNoProcessTweetCounter = statsReceiver.counter("num_no_process_tweet")
private val selfRetweetCounter = statsReceiver.counter("num_retweets_self")
private val engageUserFilter = new UserFilter(gizmoduck)(statsReceiver.scope("author_user"))
private val tweetFilter = new TweetFilter(tweetypie)
private def trackTweetCreateEventStats(details: TweetCreateEventDetails): Unit = {
tweetActionStats.counter(details.userTweetEngagement.action.toString).incr()
details.userTweetEngagement.tweetDetails.foreach { tweetDetails =>
tweetDetails.mentionUserIds.foreach(mention => numMentionsCounter.incr(mention.size))
tweetDetails.mediatagUserIds.foreach(mediatag => numMediatagCounter.incr(mediatag.size))
tweetDetails.urls.foreach(urls => numUrlCounter.incr(urls.size))
tweetDetails.mediaUrls.foreach(mediaUrls => numMediaUrlCounter.incr(mediaUrls.size))
tweetDetails.hashtags.foreach(hashtags => numHashTagCounter.incr(hashtags.size))
}
details.validMentionUserIds.foreach(mentions => numValidMentionsCounter.incr(mentions.size))
details.validMediatagUserIds.foreach(mediatags => numValidMediatagCounter.incr(mediatags.size))
}
/**
* Given a created tweet, return what type of tweet it is, i.e. Tweet, Retweet, Quote, or Reply
* Retweet, Quote, or Reply are responsive actions to a source tweet, so for these tweets,
* we also return the tweet id and author of the source tweet (ex. the tweet being retweeted).
*/
private def getTweetAction(tweetDetails: TweetDetails): Action = {
(tweetDetails.replySourceId, tweetDetails.retweetSourceId, tweetDetails.quoteSourceId) match {
case (Some(_), _, _) =>
Action.Reply
case (_, Some(_), _) =>
Action.Retweet
case (_, _, Some(_)) =>
Action.Quote
case _ =>
Action.Tweet
}
}
/**
* Given a list of mentioned users and mediatagged users in the tweet, return the users who
* actually follow the source user.
*/
private def getFollowedByIds(
sourceUserId: Long,
mentionUserIds: Option[Seq[Long]],
mediatagUserIds: Option[Seq[Long]]
): Future[Seq[Long]] = {
val uniqueEntityUserIds =
(mentionUserIds.getOrElse(Nil) ++ mediatagUserIds.getOrElse(Nil)).distinct
if (uniqueEntityUserIds.isEmpty) {
Future.Nil
} else {
socialGraph.followedByNotMutedBy(sourceUserId, uniqueEntityUserIds)
}
}
private def getSourceTweet(tweetDetails: TweetDetails): Future[Option[Tweet]] = {
tweetDetails.sourceTweetId match {
case Some(sourceTweetId) =>
tweetypie.getTweet(sourceTweetId)
case _ =>
Future.None
}
}
/**
* Extract and return the details when the source user created a new tweet.
*/
private def getTweetDetails(
tweet: Tweet,
engageUser: User
): Future[TweetCreateEventDetails] = {
val tweetDetails = TweetDetails(tweet)
val action = getTweetAction(tweetDetails)
val tweetCreationTimeMillis = SnowflakeUtils.tweetCreationTime(tweet.id).map(_.inMilliseconds)
val engageUserId = engageUser.id
val userTweetEngagement = UserTweetEngagement(
engageUserId = engageUserId,
engageUser = Some(engageUser),
action = action,
engagementTimeMillis = tweetCreationTimeMillis,
tweetId = tweet.id,
tweetDetails = Some(tweetDetails)
)
val sourceTweetFut = getSourceTweet(tweetDetails)
val followedByIdsFut = getFollowedByIds(
engageUserId,
tweetDetails.mentionUserIds,
tweetDetails.mediatagUserIds
)
Future.join(followedByIdsFut, sourceTweetFut).map {
case (followedByIds, sourceTweet) =>
TweetCreateEventDetails(
userTweetEngagement = userTweetEngagement,
validEntityUserIds = followedByIds,
sourceTweetDetails = sourceTweet.map(TweetDetails)
)
}
}
/**
* Exclude any Retweets of one's own tweets
*/
private def isEventSelfRetweet(tweetEvent: TweetCreateEventDetails): Boolean = {
(tweetEvent.userTweetEngagement.action == Action.Retweet) &&
tweetEvent.userTweetEngagement.tweetDetails.exists(
_.sourceTweetUserId.contains(
tweetEvent.userTweetEngagement.engageUserId
))
}
private def isTweetPassSafetyFilter(tweetEvent: TweetCreateEventDetails): Future[Boolean] = {
tweetEvent.userTweetEngagement.action match {
case Action.Reply | Action.Retweet | Action.Quote =>
tweetEvent.userTweetEngagement.tweetDetails
.flatMap(_.sourceTweetId).map { sourceTweetId =>
tweetFilter.filterForTweetypieSafetyLevel(sourceTweetId)
}.getOrElse(Future(false))
case Action.Tweet =>
tweetFilter.filterForTweetypieSafetyLevel(tweetEvent.userTweetEngagement.tweetId)
}
}
private def shouldProcessTweetEvent(event: TweetCreateEventDetails): Future[Boolean] = {
val engagement = event.userTweetEngagement
val engageUserId = engagement.engageUserId
val isNullCastTweet = engagement.tweetDetails.forall(_.isNullCastTweet)
val isNullCastSourceTweet = event.sourceTweetDetails.exists(_.isNullCastTweet)
val isSelfRetweet = isEventSelfRetweet(event)
val isEngageUserSafeFut = engageUserFilter.filterByUserId(engageUserId)
val isTweetPassSafetyFut = isTweetPassSafetyFilter(event)
Future.join(isEngageUserSafeFut, isTweetPassSafetyFut).map {
case (isEngageUserSafe, isTweetPassSafety) =>
if (isNullCastTweet) numNullCastTweetCounter.incr()
if (isNullCastSourceTweet) numNullCastSourceTweetCounter.incr()
if (!isEngageUserSafe) numAuthorUnsafeCounter.incr()
if (isSelfRetweet) selfRetweetCounter.incr()
if (!isTweetPassSafety) numTweetFailSafetyLevelCounter.incr()
!isNullCastTweet &&
!isNullCastSourceTweet &&
!isSelfRetweet &&
isEngageUserSafe &&
isTweetPassSafety
}
}
override def processEvent(event: TweetEvent): Future[Unit] = {
event.data match {
case TweetEventData.TweetCreateEvent(event: TweetCreateEvent) =>
getTweetDetails(
tweet = event.tweet,
engageUser = event.user
).flatMap { eventWithDetails =>
tweetCreateEventCounter.incr()
shouldProcessTweetEvent(eventWithDetails).map {
case true =>
numProcessTweetCounter.incr()
trackTweetCreateEventStats(eventWithDetails)
// Convert the event for UserUserGraph
userUserGraphMessageBuilder.processEvent(eventWithDetails).map { edges =>
edges.foreach { edge =>
kafkaEventPublisher.publish(edge.convertToRecosHoseMessage, userUserGraphTopic)
}
}
// Convert the event for UserTweetEntityGraph
userTweetEntityGraphMessageBuilder.processEvent(eventWithDetails).map { edges =>
edges.foreach { edge =>
kafkaEventPublisher
.publish(edge.convertToRecosHoseMessage, userTweetEntityGraphTopic)
}
}
case false =>
numNoProcessTweetCounter.incr()
}
}
case _ =>
nonTweetCreateEventCounter.incr()
Future.Unit
}
}
}

View File

@ -1,10 +0,0 @@
scala_library(
platform = "java11",
strict_deps = False,
tags = ["bazel-compatible"],
dependencies = [
"finagle/finagle-stats",
"recos-injector/server/src/main/scala/com/twitter/recosinjector/clients",
"src/thrift/com/twitter/gizmoduck:thrift-scala",
],
)

View File

@ -1,34 +0,0 @@
package com.twitter.recosinjector.filters
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.recosinjector.clients.Tweetypie
import com.twitter.util.Future
/**
* Filters tweets that are null cast, i.e. tweet is not delivered to a user's followers,
* not shown in the user's timeline, and does not appear in search results.
* They are mainly ads tweets.
*/
class NullCastTweetFilter(
tweetypie: Tweetypie
)(
implicit statsReceiver: StatsReceiver) {
private val stats = statsReceiver.scope(this.getClass.getSimpleName)
private val requests = stats.counter("requests")
private val filtered = stats.counter("filtered")
// Return Future(True) to keep the Tweet.
def filter(tweetId: Long): Future[Boolean] = {
requests.incr()
tweetypie
.getTweet(tweetId)
.map { tweetOpt =>
// If the null cast bit is Some(true), drop the tweet.
val isNullCastTweet = tweetOpt.flatMap(_.coreData).exists(_.nullcast)
if (isNullCastTweet) {
filtered.incr()
}
!isNullCastTweet
}
}
}

View File

@ -1,31 +0,0 @@
package com.twitter.recosinjector.filters
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.recosinjector.clients.Tweetypie
import com.twitter.util.Future
class TweetFilter(
tweetypie: Tweetypie
)(
implicit statsReceiver: StatsReceiver) {
private val stats = statsReceiver.scope(this.getClass.getSimpleName)
private val requests = stats.counter("requests")
private val filtered = stats.counter("filtered")
/**
* Query Tweetypie to see if we can fetch a tweet object successfully. TweetyPie applies a safety
* filter and will not return the tweet object if the filter does not pass.
*/
def filterForTweetypieSafetyLevel(tweetId: Long): Future[Boolean] = {
requests.incr()
tweetypie
.getTweet(tweetId)
.map {
case Some(_) =>
true
case _ =>
filtered.incr()
false
}
}
}

View File

@ -1,69 +0,0 @@
package com.twitter.recosinjector.filters
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.gizmoduck.thriftscala.{LabelValue, User}
import com.twitter.recosinjector.clients.Gizmoduck
import com.twitter.util.Future
class UserFilter(
gizmoduck: Gizmoduck
)(
implicit statsReceiver: StatsReceiver) {
private val stats = statsReceiver.scope(this.getClass.getSimpleName)
private val requests = stats.counter("requests")
private val filtered = stats.counter("filtered")
private def isUnsafe(user: User): Boolean =
user.safety.exists { s =>
s.deactivated || s.suspended || s.restricted || s.nsfwUser || s.nsfwAdmin || s.isProtected
}
private def hasNsfwHighPrecisionLabel(user: User): Boolean =
user.labels.exists {
_.labels.exists(_.labelValue == LabelValue.NsfwHighPrecision)
}
/**
* NOTE: This will by-pass Gizmoduck's safety level, and might allow invalid users to pass filter.
* Consider using filterByUserId instead.
* Return true if the user is valid, otherwise return false.
* It will first attempt to use the user object provided by the caller, and will call Gizmoduck
* to back fill if the caller does not provide it. This helps reduce Gizmoduck traffic.
*/
def filterByUser(
userId: Long,
userOpt: Option[User] = None
): Future[Boolean] = {
requests.incr()
val userFut = userOpt match {
case Some(user) => Future(Some(user))
case _ => gizmoduck.getUser(userId)
}
userFut.map(_.exists { user =>
val isValidUser = !isUnsafe(user) && !hasNsfwHighPrecisionLabel(user)
if (!isValidUser) filtered.incr()
isValidUser
})
}
/**
* Given a userId, return true if the user is valid. This id done in 2 steps:
* 1. Applying Gizmoduck's safety level while querying for the user from Gizmoduck
* 2. If a user passes Gizmoduck's safety level, check its specific user status
*/
def filterByUserId(userId: Long): Future[Boolean] = {
requests.incr()
gizmoduck
.getUser(userId)
.map { userOpt =>
val isValidUser = userOpt.exists { user =>
!(isUnsafe(user) || hasNsfwHighPrecisionLabel(user))
}
if (!isValidUser) {
filtered.incr()
}
isValidUser
}
}
}

View File

@ -1,12 +0,0 @@
scala_library(
platform = "java11",
strict_deps = False,
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/org/apache/thrift:libthrift",
"finatra-internal/messaging/kafka/src/main/scala",
"servo/repo/src/main/scala",
"src/thrift/com/twitter/recos:recos-injector-scala",
"src/thrift/com/twitter/recos:recos-internal-scala",
],
)

View File

@ -1,54 +0,0 @@
package com.twitter.recosinjector.publishers
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.thrift.ClientId
import com.twitter.finatra.kafka.producers.FinagleKafkaProducerBuilder
import com.twitter.finatra.kafka.serde.ScalaSerdes
import com.twitter.recos.internal.thriftscala.RecosHoseMessage
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.config.SslConfigs
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.StringSerializer
case class KafkaEventPublisher(
kafkaDest: String,
outputKafkaTopicPrefix: String,
clientId: ClientId,
truststoreLocation: String) {
private val producer = FinagleKafkaProducerBuilder[String, RecosHoseMessage]()
.dest(kafkaDest)
.clientId(clientId.name)
.keySerializer(new StringSerializer)
.valueSerializer(ScalaSerdes.Thrift[RecosHoseMessage].serializer)
.withConfig(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.toString)
.withConfig(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreLocation)
.withConfig(SaslConfigs.SASL_MECHANISM, SaslConfigs.GSSAPI_MECHANISM)
.withConfig(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka")
.withConfig(SaslConfigs.SASL_KERBEROS_SERVER_NAME, "kafka")
// Use Native Kafka Client
.buildClient()
def publish(
message: RecosHoseMessage,
topic: String
)(
implicit statsReceiver: StatsReceiver
): Unit = {
val topicName = s"${outputKafkaTopicPrefix}_$topic"
// Kafka Producer is thread-safe. No extra Future-pool protect.
producer.send(new ProducerRecord(topicName, message))
statsReceiver.counter(topicName + "_written_msg_success").incr()
}
}
object KafkaEventPublisher {
// Kafka topics available for publishing
val UserVideoTopic = "user_video"
val UserTweetEntityTopic = "user_tweet_entity"
val UserUserTopic = "user_user"
val UserAdTopic = "user_tweet"
val UserTweetPlusTopic = "user_tweet_plus"
}

View File

@ -1,12 +0,0 @@
scala_library(
platform = "java11",
strict_deps = False,
tags = ["bazel-compatible"],
dependencies = [
"3rdparty/jvm/com/twitter/graphjet",
"finagle/finagle-stats",
"recos-injector/server/src/main/scala/com/twitter/recosinjector/clients",
"src/scala/com/twitter/recos/util:recos-util",
"src/thrift/com/twitter/tweetypie:tweet-scala",
],
)

View File

@ -1,126 +0,0 @@
package com.twitter.recosinjector.util
import com.twitter.frigate.common.base.TweetUtil
import com.twitter.gizmoduck.thriftscala.User
import com.twitter.recos.util.Action.Action
import com.twitter.tweetypie.thriftscala.Tweet
/**
* This is used to store information about a newly created tweet
* @param validEntityUserIds For users mentioned or mediatagged in the tweet, these follow the
* engage user and only they are are considered valid
* @param sourceTweetDetails For Reply, Quote, or RT, source tweet is the tweet being actioned on
*/
case class TweetCreateEventDetails(
userTweetEngagement: UserTweetEngagement,
validEntityUserIds: Seq[Long],
sourceTweetDetails: Option[TweetDetails]) {
// A mention is only valid if the mentioned user follows the source user
val validMentionUserIds: Option[Seq[Long]] = {
userTweetEngagement.tweetDetails.flatMap(_.mentionUserIds.map(_.intersect(validEntityUserIds)))
}
// A mediatag is only valid if the mediatagged user follows the source user
val validMediatagUserIds: Option[Seq[Long]] = {
userTweetEngagement.tweetDetails.flatMap(_.mediatagUserIds.map(_.intersect(validEntityUserIds)))
}
}
/**
* Stores information about a favorite/unfav engagement.
* NOTE: This could either be Likes, or UNLIKEs (i.e. when user cancels the Like)
* @param userTweetEngagement the engagement details
*/
case class TweetFavoriteEventDetails(
userTweetEngagement: UserTweetEngagement)
/**
* Stores information about a unified user action engagement.
* @param userTweetEngagement the engagement details
*/
case class UuaEngagementEventDetails(
userTweetEngagement: UserTweetEngagement)
/**
* Details about a user-tweet engagement, like when a user tweeted/liked a tweet
* @param engageUserId User that engaged with the tweet
* @param action The action the user took on the tweet
* @param tweetId The type of engagement the user took on the tweet
*/
case class UserTweetEngagement(
engageUserId: Long,
engageUser: Option[User],
action: Action,
engagementTimeMillis: Option[Long],
tweetId: Long,
tweetDetails: Option[TweetDetails])
/**
* Helper class that decomposes a tweet object and provides related details about this tweet
*/
case class TweetDetails(tweet: Tweet) {
val authorId: Option[Long] = tweet.coreData.map(_.userId)
val urls: Option[Seq[String]] = tweet.urls.map(_.map(_.url))
val mediaUrls: Option[Seq[String]] = tweet.media.map(_.map(_.expandedUrl))
val hashtags: Option[Seq[String]] = tweet.hashtags.map(_.map(_.text))
// mentionUserIds include reply user ids at the beginning of a tweet
val mentionUserIds: Option[Seq[Long]] = tweet.mentions.map(_.flatMap(_.userId))
val mediatagUserIds: Option[Seq[Long]] = tweet.mediaTags.map {
_.tagMap.flatMap {
case (_, mediaTag) => mediaTag.flatMap(_.userId)
}.toSeq
}
val replySourceId: Option[Long] = tweet.coreData.flatMap(_.reply.flatMap(_.inReplyToStatusId))
val replyUserId: Option[Long] = tweet.coreData.flatMap(_.reply.map(_.inReplyToUserId))
val retweetSourceId: Option[Long] = tweet.coreData.flatMap(_.share.map(_.sourceStatusId))
val retweetUserId: Option[Long] = tweet.coreData.flatMap(_.share.map(_.sourceUserId))
val quoteSourceId: Option[Long] = tweet.quotedTweet.map(_.tweetId)
val quoteUserId: Option[Long] = tweet.quotedTweet.map(_.userId)
val quoteTweetUrl: Option[String] = tweet.quotedTweet.flatMap(_.permalink.map(_.shortUrl))
//If the tweet is retweet/reply/quote, this is the tweet that the new tweet responds to
val (sourceTweetId, sourceTweetUserId) = {
(replySourceId, retweetSourceId, quoteSourceId) match {
case (Some(replyId), _, _) =>
(Some(replyId), replyUserId)
case (_, Some(retweetId), _) =>
(Some(retweetId), retweetUserId)
case (_, _, Some(quoteId)) =>
(Some(quoteId), quoteUserId)
case _ =>
(None, None)
}
}
// Boolean information
val hasPhoto: Boolean = TweetUtil.containsPhotoTweet(tweet)
val hasVideo: Boolean = TweetUtil.containsVideoTweet(tweet)
// TweetyPie does not populate url fields in a quote tweet create event, even though we
// consider quote tweets as url tweets. This boolean helps make up for it.
// Details: https://groups.google.com/a/twitter.com/d/msg/eng/BhK1XAcSSWE/F8Gc4_5uDwAJ
val hasQuoteTweetUrl: Boolean = tweet.quotedTweet.exists(_.permalink.isDefined)
val hasUrl: Boolean = this.urls.exists(_.nonEmpty) || hasQuoteTweetUrl
val hasHashtag: Boolean = this.hashtags.exists(_.nonEmpty)
val isCard: Boolean = hasUrl | hasPhoto | hasVideo
implicit def bool2Long(b: Boolean): Long = if (b) 1L else 0L
// Return a hashed long that contains card type information of the tweet
val cardInfo: Long = isCard | (hasUrl << 1) | (hasPhoto << 2) | (hasVideo << 3)
// nullcast tweet is one that is purposefully not broadcast to followers, ex. an ad tweet.
val isNullCastTweet: Boolean = tweet.coreData.exists(_.nullcast)
}

View File

@ -1,24 +0,0 @@
scala_library(
platform = "java11",
strict_deps = False,
tags = ["bazel-compatible"],
dependencies = [
"eventbus/client",
"kafka/finagle-kafka/finatra-kafka/src/main/scala",
"kafka/libs/src/main/scala/com/twitter/kafka/client/headers",
"kafka/libs/src/main/scala/com/twitter/kafka/client/processor",
"recos-injector/server/config",
"recos-injector/server/src/main/scala/com/twitter/recosinjector/clients",
"recos-injector/server/src/main/scala/com/twitter/recosinjector/config",
"recos-injector/server/src/main/scala/com/twitter/recosinjector/decider",
"recos-injector/server/src/main/scala/com/twitter/recosinjector/edges",
"recos-injector/server/src/main/scala/com/twitter/recosinjector/publishers",
"src/thrift/com/twitter/clientapp/gen:clientapp-scala",
"src/thrift/com/twitter/gizmoduck:user-thrift-scala",
"src/thrift/com/twitter/socialgraph:thrift-scala",
"src/thrift/com/twitter/timelineservice/server/internal:thrift-scala",
"src/thrift/com/twitter/tweetypie:events-scala",
"src/thrift/com/twitter/tweetypie:tweet-scala",
"unified_user_actions/thrift/src/main/thrift/com/twitter/unified_user_actions:unified_user_actions-scala",
],
)

View File

@ -1,181 +0,0 @@
package com.twitter.recosinjector.uua_processors
import org.apache.kafka.clients.consumer.ConsumerRecord
import com.twitter.finatra.kafka.serde.UnKeyed
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.recos.util.Action
import com.twitter.recos.util.Action.Action
import com.twitter.recosinjector.clients.Gizmoduck
import com.twitter.recosinjector.clients.Tweetypie
import com.twitter.recosinjector.edges.UnifiedUserActionToUserVideoGraphBuilder
import com.twitter.recosinjector.edges.UnifiedUserActionToUserAdGraphBuilder
import com.twitter.recosinjector.edges.UnifiedUserActionToUserTweetGraphPlusBuilder
import com.twitter.unified_user_actions.thriftscala.UnifiedUserAction
import com.twitter.unified_user_actions.thriftscala.ActionType
import com.twitter.unified_user_actions.thriftscala.Item
import com.twitter.recosinjector.filters.UserFilter
import com.twitter.recosinjector.publishers.KafkaEventPublisher
import com.twitter.recosinjector.util.TweetDetails
import com.twitter.recosinjector.util.UserTweetEngagement
import com.twitter.recosinjector.util.UuaEngagementEventDetails
import com.twitter.unified_user_actions.thriftscala.NotificationContent
import com.twitter.unified_user_actions.thriftscala.NotificationInfo
import com.twitter.util.Future
class UnifiedUserActionProcessor(
gizmoduck: Gizmoduck,
tweetypie: Tweetypie,
kafkaEventPublisher: KafkaEventPublisher,
userVideoGraphTopic: String,
userVideoGraphBuilder: UnifiedUserActionToUserVideoGraphBuilder,
userAdGraphTopic: String,
userAdGraphBuilder: UnifiedUserActionToUserAdGraphBuilder,
userTweetGraphPlusTopic: String,
userTweetGraphPlusBuilder: UnifiedUserActionToUserTweetGraphPlusBuilder
)(
implicit statsReceiver: StatsReceiver) {
val messagesProcessedCount = statsReceiver.counter("messages_processed")
val eventsByTypeCounts = statsReceiver.scope("events_by_type")
private val numSelfEngageCounter = statsReceiver.counter("num_self_engage_event")
private val numTweetFailSafetyLevelCounter = statsReceiver.counter("num_fail_tweetypie_safety")
private val numNullCastTweetCounter = statsReceiver.counter("num_null_cast_tweet")
private val numEngageUserUnsafeCounter = statsReceiver.counter("num_engage_user_unsafe")
private val engageUserFilter = new UserFilter(gizmoduck)(statsReceiver.scope("engage_user"))
private val numNoProcessTweetCounter = statsReceiver.counter("num_no_process_tweet")
private val numProcessTweetCounter = statsReceiver.counter("num_process_tweet")
private def getUuaEngagementEventDetails(
unifiedUserAction: UnifiedUserAction
): Option[Future[UuaEngagementEventDetails]] = {
val userIdOpt = unifiedUserAction.userIdentifier.userId
val tweetIdOpt = unifiedUserAction.item match {
case Item.TweetInfo(tweetInfo) => Some(tweetInfo.actionTweetId)
case Item.NotificationInfo(
NotificationInfo(_, NotificationContent.TweetNotification(notification))) =>
Some(notification.tweetId)
case _ => None
}
val timestamp = unifiedUserAction.eventMetadata.sourceTimestampMs
val action = getTweetAction(unifiedUserAction.actionType)
tweetIdOpt
.flatMap { tweetId =>
userIdOpt.map { engageUserId =>
val tweetFut = tweetypie.getTweet(tweetId)
tweetFut.map { tweetOpt =>
val tweetDetailsOpt = tweetOpt.map(TweetDetails)
val engagement = UserTweetEngagement(
engageUserId = engageUserId,
action = action,
engagementTimeMillis = Some(timestamp),
tweetId = tweetId,
engageUser = None,
tweetDetails = tweetDetailsOpt
)
UuaEngagementEventDetails(engagement)
}
}
}
}
private def getTweetAction(action: ActionType): Action = {
action match {
case ActionType.ClientTweetVideoPlayback50 => Action.VideoPlayback50
case ActionType.ClientTweetClick => Action.Click
case ActionType.ClientTweetVideoPlayback75 => Action.VideoPlayback75
case ActionType.ClientTweetVideoQualityView => Action.VideoQualityView
case ActionType.ServerTweetFav => Action.Favorite
case ActionType.ServerTweetReply => Action.Reply
case ActionType.ServerTweetRetweet => Action.Retweet
case ActionType.ClientTweetQuote => Action.Quote
case ActionType.ClientNotificationOpen => Action.NotificationOpen
case ActionType.ClientTweetEmailClick => Action.EmailClick
case ActionType.ClientTweetShareViaBookmark => Action.Share
case ActionType.ClientTweetShareViaCopyLink => Action.Share
case ActionType.ClientTweetSeeFewer => Action.TweetSeeFewer
case ActionType.ClientTweetNotRelevant => Action.TweetNotRelevant
case ActionType.ClientTweetNotInterestedIn => Action.TweetNotInterestedIn
case ActionType.ServerTweetReport => Action.TweetReport
case ActionType.ClientTweetMuteAuthor => Action.TweetMuteAuthor
case ActionType.ClientTweetBlockAuthor => Action.TweetBlockAuthor
case _ => Action.UnDefined
}
}
private def shouldProcessTweetEngagement(
event: UuaEngagementEventDetails,
isAdsUseCase: Boolean = false
): Future[Boolean] = {
val engagement = event.userTweetEngagement
val engageUserId = engagement.engageUserId
val authorIdOpt = engagement.tweetDetails.flatMap(_.authorId)
val isSelfEngage = authorIdOpt.contains(engageUserId)
val isNullCastTweet = engagement.tweetDetails.forall(_.isNullCastTweet)
val isEngageUserSafeFut = engageUserFilter.filterByUserId(engageUserId)
val isTweetPassSafety =
engagement.tweetDetails.isDefined // Tweetypie can fetch a tweet object successfully
isEngageUserSafeFut.map { isEngageUserSafe =>
if (isSelfEngage) numSelfEngageCounter.incr()
if (isNullCastTweet) numNullCastTweetCounter.incr()
if (!isEngageUserSafe) numEngageUserUnsafeCounter.incr()
if (!isTweetPassSafety) numTweetFailSafetyLevelCounter.incr()
!isSelfEngage && (!isNullCastTweet && !isAdsUseCase || isNullCastTweet && isAdsUseCase) && isEngageUserSafe && isTweetPassSafety
}
}
def apply(record: ConsumerRecord[UnKeyed, UnifiedUserAction]): Future[Unit] = {
messagesProcessedCount.incr()
val unifiedUserAction = record.value
eventsByTypeCounts.counter(unifiedUserAction.actionType.toString).incr()
getTweetAction(unifiedUserAction.actionType) match {
case Action.UnDefined =>
numNoProcessTweetCounter.incr()
Future.Unit
case action =>
getUuaEngagementEventDetails(unifiedUserAction)
.map {
_.flatMap { detail =>
// The following cases are set up specifically for an ads relevance demo.
val actionForAds = Set(Action.Click, Action.Favorite, Action.VideoPlayback75)
if (actionForAds.contains(action))
shouldProcessTweetEngagement(detail, isAdsUseCase = true).map {
case true =>
userAdGraphBuilder.processEvent(detail).map { edges =>
edges.foreach { edge =>
kafkaEventPublisher
.publish(edge.convertToRecosHoseMessage, userAdGraphTopic)
}
}
numProcessTweetCounter.incr()
case _ =>
}
shouldProcessTweetEngagement(detail).map {
case true =>
userVideoGraphBuilder.processEvent(detail).map { edges =>
edges.foreach { edge =>
kafkaEventPublisher
.publish(edge.convertToRecosHoseMessage, userVideoGraphTopic)
}
}
userTweetGraphPlusBuilder.processEvent(detail).map { edges =>
edges.foreach { edge =>
kafkaEventPublisher
.publish(edge.convertToRecosHoseMessage, userTweetGraphPlusTopic)
}
}
numProcessTweetCounter.incr()
case _ =>
}
}
}.getOrElse(Future.Unit)
}
}
}

View File

@ -1,71 +0,0 @@
package com.twitter.recosinjector.uua_processors
import com.twitter.conversions.DurationOps._
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finatra.kafka.consumers.FinagleKafkaConsumerBuilder
import com.twitter.finatra.kafka.domain.KafkaGroupId
import com.twitter.finatra.kafka.domain.SeekStrategy
import com.twitter.finatra.kafka.serde.ScalaSerdes
import com.twitter.finatra.kafka.serde.UnKeyed
import com.twitter.finatra.kafka.serde.UnKeyedSerde
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.config.SslConfigs
import org.apache.kafka.common.security.auth.SecurityProtocol
import com.twitter.unified_user_actions.thriftscala.UnifiedUserAction
import com.twitter.kafka.client.processor.AtLeastOnceProcessor
import com.twitter.kafka.client.processor.ThreadSafeKafkaConsumerClient
import com.twitter.conversions.StorageUnitOps._
class UnifiedUserActionsConsumer(
processor: UnifiedUserActionProcessor,
truststoreLocation: String
)(
implicit statsReceiver: StatsReceiver) {
import UnifiedUserActionsConsumer._
private val kafkaClient = new ThreadSafeKafkaConsumerClient[UnKeyed, UnifiedUserAction](
FinagleKafkaConsumerBuilder[UnKeyed, UnifiedUserAction]()
.groupId(KafkaGroupId(uuaRecosInjectorGroupId))
.keyDeserializer(UnKeyedSerde.deserializer)
.valueDeserializer(ScalaSerdes.Thrift[UnifiedUserAction].deserializer)
.dest(uuaDest)
.maxPollRecords(maxPollRecords)
.maxPollInterval(maxPollInterval)
.fetchMax(fetchMax)
.seekStrategy(SeekStrategy.END)
.enableAutoCommit(false) // AtLeastOnceProcessor performs commits manually
.withConfig(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.toString)
.withConfig(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreLocation)
.withConfig(SaslConfigs.SASL_MECHANISM, SaslConfigs.GSSAPI_MECHANISM)
.withConfig(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka")
.withConfig(SaslConfigs.SASL_KERBEROS_SERVER_NAME, "kafka")
.config)
val atLeastOnceProcessor: AtLeastOnceProcessor[UnKeyed, UnifiedUserAction] = {
AtLeastOnceProcessor[UnKeyed, UnifiedUserAction](
name = processorName,
topic = uuaTopic,
consumer = kafkaClient,
processor = processor.apply,
maxPendingRequests = maxPendingRequests,
workerThreads = workerThreads,
commitIntervalMs = commitIntervalMs,
statsReceiver = statsReceiver.scope(processorName)
)
}
}
object UnifiedUserActionsConsumer {
val maxPollRecords = 1000
val maxPollInterval = 5.minutes
val fetchMax = 1.megabytes
val maxPendingRequests = 1000
val workerThreads = 16
val commitIntervalMs = 10.seconds.inMilliseconds
val processorName = "unified_user_actions_processor"
val uuaTopic = "unified_user_actions_engagements"
val uuaDest = "/s/kafka/bluebird-1:kafka-tls"
val uuaRecosInjectorGroupId = "recos-injector"
}