diff --git a/recos-injector/BUILD.bazel b/recos-injector/BUILD.bazel deleted file mode 100644 index dbd3d0619..000000000 --- a/recos-injector/BUILD.bazel +++ /dev/null @@ -1 +0,0 @@ -# This prevents SQ query from grabbing //:all since it traverses up once to find a BUILD (DPB-14048) diff --git a/recos-injector/CONFIG.ini b/recos-injector/CONFIG.ini deleted file mode 100644 index b3f176acb..000000000 --- a/recos-injector/CONFIG.ini +++ /dev/null @@ -1,10 +0,0 @@ -; See http://go/CONFIG.ini - -[jira] -project: SD - -[docbird] -project_name = recos-injector - -[kite] -project: recos-injector diff --git a/recos-injector/README.md b/recos-injector/README.md deleted file mode 100644 index a391578c2..000000000 --- a/recos-injector/README.md +++ /dev/null @@ -1,40 +0,0 @@ -# Recos-Injector - -Recos-Injector is a streaming event processor used to build input streams for GraphJet-based services. It is a general-purpose tool that consumes arbitrary incoming event streams (e.g., Fav, RT, Follow, client_events, etc.), applies filtering, and combines and publishes cleaned up events to corresponding GraphJet services. Each GraphJet-based service subscribes to a dedicated Kafka topic, and Recos-Injector enables GraphJet-based services to consume any event they want. - -## How to run Recos-Injector server tests - -You can run tests by using the following command from your project's root directory: - - $ bazel build recos-injector/... - $ bazel test recos-injector/... - -## How to run recos-injector-server in development on a local machine - -The simplest way to stand up a service is to run it locally. To run -recos-injector-server in development mode, compile the project and then -execute it with `bazel run`: - - $ bazel build recos-injector/server:bin - $ bazel run recos-injector/server:bin - -A tunnel can be set up in order for downstream queries to work properly. -Upon successful server startup, try to `curl` its admin endpoint in another -terminal: - - $ curl -s localhost:9990/admin/ping - pong - -Run `curl -s localhost:9990/admin` to see a list of all available admin endpoints. - -## Querying Recos-Injector server from a Scala console - -Recos-Injector does not have a Thrift endpoint. Instead, it reads Event Bus and Kafka queues and writes to the Recos-Injector Kafka. - -## Generating a package for deployment - -To package your service into a zip file for deployment, run: - - $ bazel bundle recos-injector/server:bin --bundle-jvm-archive=zip - -If the command is successful, a file named `dist/recos-injector-server.zip` will be created. diff --git a/recos-injector/server/BUILD b/recos-injector/server/BUILD deleted file mode 100644 index 55e2ab35b..000000000 --- a/recos-injector/server/BUILD +++ /dev/null @@ -1,43 +0,0 @@ -target( - name = "server", - tags = ["bazel-compatible"], - dependencies = [ - "recos-injector/server/src/main/scala/com/twitter/recosinjector", - ], -) - -test_suite( - name = "tests", - tags = ["bazel-compatible"], - dependencies = [ - "recos-injector/server/src/test/scala/com/twitter/recosinjector", - ], -) - -jvm_binary( - name = "bin", - basename = "recos-injector-server", - main = "com.twitter.recosinjector.Main", - platform = "java11", - runtime_platform = "java11", - tags = [ - "bazel-compatible:migrated", - ], - dependencies = [ - ":server", - "3rdparty/jvm/org/slf4j:slf4j-jdk14", - ], -) - -jvm_app( - name = "bundle", - basename = "recos-injector", - binary = ":bin", - bundles = [bundle( - fileset = ["config/*"], - owning_target = "recos-injector/server/config:files", - )], - tags = [ - "bazel-compatible:migrated", - ], -) diff --git a/recos-injector/server/config/BUILD b/recos-injector/server/config/BUILD deleted file mode 100644 index 68161409f..000000000 --- a/recos-injector/server/config/BUILD +++ /dev/null @@ -1,20 +0,0 @@ -resources( - sources = [ - "!*.pyc", - "!BUILD*", - "*", - ], - tags = ["bazel-compatible"], -) - -# Created for Bazel compatibility. -# In Bazel, loose files must be part of a target to be included into a bundle. -# See also http://go/bazel-compatibility/bundle_does_not_match_any_files -files( - name = "files", - sources = [ - "!BUILD", - "**/*", - ], - tags = ["bazel-compatible"], -) diff --git a/recos-injector/server/config/change_log_config.ini b/recos-injector/server/config/change_log_config.ini deleted file mode 100755 index 6a708e921..000000000 --- a/recos-injector/server/config/change_log_config.ini +++ /dev/null @@ -1,7 +0,0 @@ -[Configs] -DCS = all -ROLE = recos-injector -JOB = recos-injector -ENV = prod -PACKAGE = recos-injector-release -PATH = recos-injector diff --git a/recos-injector/server/config/decider.yml b/recos-injector/server/config/decider.yml deleted file mode 100644 index 13cb5262d..000000000 --- a/recos-injector/server/config/decider.yml +++ /dev/null @@ -1,11 +0,0 @@ -tweet_event_transformer_user_tweet_entity_edges: - comment: "Enables the generation of UserTweetEntity edges in tweet event transformer" - default_availability: 0 - -enable_emit_tweet_edge_from_reply: - comment: "Decides when processing a Reply edge, whether to generate a Tweet edge for it as well" - default_availability: 0 - -enable_unfavorite_edge: - comment: "Decides when processing a UnfavoriteEvent from Timeline events, whether to process unfav edges" - default_availability: 0 diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/BUILD b/recos-injector/server/src/main/scala/com/twitter/recosinjector/BUILD deleted file mode 100644 index ec46c6a41..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/BUILD +++ /dev/null @@ -1,40 +0,0 @@ -scala_library( - platform = "java11", - strict_deps = False, - tags = [ - "bazel-compatible", - ], - dependencies = [ - "3rdparty/jvm/io/netty:netty4-tcnative-boringssl-static", - "3rdparty/jvm/org/apache/thrift:libthrift", - "eventbus/client", - "finagle-internal/mtls/src/main/scala/com/twitter/finagle/mtls/authentication", - "finagle-internal/mtls/src/main/scala/com/twitter/finagle/mtls/client", - "finagle-internal/mtls/src/main/scala/com/twitter/finagle/mtls/server", - "finagle/finagle-core/src/main", - "finagle/finagle-http/src/main/scala", - "finagle/finagle-stats", - "finagle/finagle-thriftmux", - "recos-injector/server/config", - "recos-injector/server/src/main/scala/com/twitter/recosinjector/clients", - "recos-injector/server/src/main/scala/com/twitter/recosinjector/config", - "recos-injector/server/src/main/scala/com/twitter/recosinjector/decider", - "recos-injector/server/src/main/scala/com/twitter/recosinjector/edges", - "recos-injector/server/src/main/scala/com/twitter/recosinjector/event_processors", - "recos-injector/server/src/main/scala/com/twitter/recosinjector/publishers", - "recos-injector/server/src/main/scala/com/twitter/recosinjector/uua_processors", - "src/thrift/com/twitter/clientapp/gen:clientapp-scala", - "src/thrift/com/twitter/gizmoduck:user-thrift-scala", - "src/thrift/com/twitter/socialgraph:thrift-scala", - "src/thrift/com/twitter/timelineservice/server/internal:thrift-scala", - "src/thrift/com/twitter/tweetypie:events-scala", - "src/thrift/com/twitter/tweetypie:tweet-scala", - "thrift-web-forms", - "twitter-server-internal", - "twitter-server/server/src/main/scala", - "twitter-server/slf4j-jdk14/src/main/scala/com/twitter/server/logging", - "util/util-app", - "util/util-logging/src/main/scala", - "util/util-stats/src/main/scala", - ], -) diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/Main.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/Main.scala deleted file mode 100644 index be8d147b1..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/Main.scala +++ /dev/null @@ -1,213 +0,0 @@ -package com.twitter.recosinjector - -import com.twitter.app.Flag -import com.twitter.finagle.http.HttpMuxer -import com.twitter.finagle.mtls.authentication.ServiceIdentifier -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.frigate.common.util.ElfOwlFilter -import com.twitter.recosinjector.clients.Gizmoduck -import com.twitter.recosinjector.clients.RecosHoseEntitiesCache -import com.twitter.recosinjector.clients.SocialGraph -import com.twitter.recosinjector.clients.Tweetypie -import com.twitter.recosinjector.clients.UrlResolver -import com.twitter.recosinjector.config._ -import com.twitter.recosinjector.edges.SocialWriteEventToUserUserGraphBuilder -import com.twitter.recosinjector.edges.TimelineEventToUserTweetEntityGraphBuilder -import com.twitter.recosinjector.edges.TweetEventToUserTweetEntityGraphBuilder -import com.twitter.recosinjector.edges.TweetEventToUserUserGraphBuilder -import com.twitter.recosinjector.edges.UnifiedUserActionToUserVideoGraphBuilder -import com.twitter.recosinjector.edges.UnifiedUserActionToUserAdGraphBuilder -import com.twitter.recosinjector.edges.UnifiedUserActionToUserTweetGraphPlusBuilder -import com.twitter.recosinjector.edges.UserTweetEntityEdgeBuilder -import com.twitter.recosinjector.event_processors.SocialWriteEventProcessor -import com.twitter.recosinjector.event_processors.TimelineEventProcessor -import com.twitter.recosinjector.event_processors.TweetEventProcessor -import com.twitter.recosinjector.publishers.KafkaEventPublisher -import com.twitter.recosinjector.uua_processors.UnifiedUserActionProcessor -import com.twitter.recosinjector.uua_processors.UnifiedUserActionsConsumer -import com.twitter.server.logging.{Logging => JDK14Logging} -import com.twitter.server.Deciderable -import com.twitter.server.TwitterServer -import com.twitter.socialgraph.thriftscala.WriteEvent -import com.twitter.timelineservice.thriftscala.{Event => TimelineEvent} -import com.twitter.tweetypie.thriftscala.TweetEvent -import com.twitter.util.Await -import com.twitter.util.Duration -import java.util.concurrent.TimeUnit - -object Main extends TwitterServer with JDK14Logging with Deciderable { self => - - implicit val stats: StatsReceiver = statsReceiver - - private val dataCenter: Flag[String] = flag("service.cluster", "atla", "Data Center") - private val serviceRole: Flag[String] = flag("service.role", "Service Role") - private val serviceEnv: Flag[String] = flag("service.env", "Service Env") - private val serviceName: Flag[String] = flag("service.name", "Service Name") - private val shardId = flag("shardId", 0, "Shard ID") - private val numShards = flag("numShards", 1, "Number of shards for this service") - private val truststoreLocation = - flag[String]("truststore_location", "", "Truststore file location") - - def main(): Unit = { - val serviceIdentifier = ServiceIdentifier( - role = serviceRole(), - service = serviceName(), - environment = serviceEnv(), - zone = dataCenter() - ) - println("ServiceIdentifier = " + serviceIdentifier.toString) - log.info("ServiceIdentifier = " + serviceIdentifier.toString) - - val shard = shardId() - val numOfShards = numShards() - val environment = serviceEnv() - - implicit val config: DeployConfig = { - environment match { - case "prod" => ProdConfig(serviceIdentifier)(stats) - case "staging" | "devel" => StagingConfig(serviceIdentifier) - case env => throw new Exception(s"Unknown environment $env") - } - } - - // Initialize the config and wait for initialization to finish - Await.ready(config.init()) - - log.info( - "Starting Recos Injector: environment %s, clientId %s", - environment, - config.recosInjectorThriftClientId - ) - log.info("Starting shard Id: %d of %d shards...".format(shard, numOfShards)) - - // Client wrappers - val cache = new RecosHoseEntitiesCache(config.recosInjectorCoreSvcsCacheClient) - val gizmoduck = new Gizmoduck(config.userStore) - val socialGraph = new SocialGraph(config.socialGraphIdStore) - val tweetypie = new Tweetypie(config.tweetyPieStore) - val urlResolver = new UrlResolver(config.urlInfoStore) - - // Edge builders - val userTweetEntityEdgeBuilder = new UserTweetEntityEdgeBuilder(cache, urlResolver) - - // Publishers - val kafkaEventPublisher = KafkaEventPublisher( - "/s/kafka/recommendations:kafka-tls", - config.outputKafkaTopicPrefix, - config.recosInjectorThriftClientId, - truststoreLocation()) - - // Message Builders - val socialWriteToUserUserMessageBuilder = - new SocialWriteEventToUserUserGraphBuilder()( - statsReceiver.scope("SocialWriteEventToUserUserGraphBuilder") - ) - - val timelineToUserTweetEntityMessageBuilder = new TimelineEventToUserTweetEntityGraphBuilder( - userTweetEntityEdgeBuilder = userTweetEntityEdgeBuilder - )(statsReceiver.scope("TimelineEventToUserTweetEntityGraphBuilder")) - - val tweetEventToUserTweetEntityGraphBuilder = new TweetEventToUserTweetEntityGraphBuilder( - userTweetEntityEdgeBuilder = userTweetEntityEdgeBuilder, - tweetCreationStore = config.tweetCreationStore, - decider = config.recosInjectorDecider - )(statsReceiver.scope("TweetEventToUserTweetEntityGraphBuilder")) - - val socialWriteEventProcessor = new SocialWriteEventProcessor( - eventBusStreamName = s"recos_injector_social_write_event_$environment", - thriftStruct = WriteEvent, - serviceIdentifier = serviceIdentifier, - kafkaEventPublisher = kafkaEventPublisher, - userUserGraphTopic = KafkaEventPublisher.UserUserTopic, - userUserGraphMessageBuilder = socialWriteToUserUserMessageBuilder - )(statsReceiver.scope("SocialWriteEventProcessor")) - - val tweetToUserUserMessageBuilder = new TweetEventToUserUserGraphBuilder()( - statsReceiver.scope("TweetEventToUserUserGraphBuilder") - ) - - val unifiedUserActionToUserVideoGraphBuilder = new UnifiedUserActionToUserVideoGraphBuilder( - userTweetEntityEdgeBuilder = userTweetEntityEdgeBuilder - )(statsReceiver.scope("UnifiedUserActionToUserVideoGraphBuilder")) - - val unifiedUserActionToUserAdGraphBuilder = new UnifiedUserActionToUserAdGraphBuilder( - userTweetEntityEdgeBuilder = userTweetEntityEdgeBuilder - )(statsReceiver.scope("UnifiedUserActionToUserAdGraphBuilder")) - - val unifiedUserActionToUserTweetGraphPlusBuilder = - new UnifiedUserActionToUserTweetGraphPlusBuilder( - userTweetEntityEdgeBuilder = userTweetEntityEdgeBuilder - )(statsReceiver.scope("UnifiedUserActionToUserTweetGraphPlusBuilder")) - - // Processors - val tweetEventProcessor = new TweetEventProcessor( - eventBusStreamName = s"recos_injector_tweet_events_$environment", - thriftStruct = TweetEvent, - serviceIdentifier = serviceIdentifier, - userUserGraphMessageBuilder = tweetToUserUserMessageBuilder, - userUserGraphTopic = KafkaEventPublisher.UserUserTopic, - userTweetEntityGraphMessageBuilder = tweetEventToUserTweetEntityGraphBuilder, - userTweetEntityGraphTopic = KafkaEventPublisher.UserTweetEntityTopic, - kafkaEventPublisher = kafkaEventPublisher, - socialGraph = socialGraph, - tweetypie = tweetypie, - gizmoduck = gizmoduck - )(statsReceiver.scope("TweetEventProcessor")) - - val timelineEventProcessor = new TimelineEventProcessor( - eventBusStreamName = s"recos_injector_timeline_events_prototype_$environment", - thriftStruct = TimelineEvent, - serviceIdentifier = serviceIdentifier, - kafkaEventPublisher = kafkaEventPublisher, - userTweetEntityGraphTopic = KafkaEventPublisher.UserTweetEntityTopic, - userTweetEntityGraphMessageBuilder = timelineToUserTweetEntityMessageBuilder, - decider = config.recosInjectorDecider, - gizmoduck = gizmoduck, - tweetypie = tweetypie - )(statsReceiver.scope("TimelineEventProcessor")) - - val eventBusProcessors = Seq( - timelineEventProcessor, - socialWriteEventProcessor, - tweetEventProcessor - ) - - val uuaProcessor = new UnifiedUserActionProcessor( - gizmoduck = gizmoduck, - tweetypie = tweetypie, - kafkaEventPublisher = kafkaEventPublisher, - userVideoGraphTopic = KafkaEventPublisher.UserVideoTopic, - userVideoGraphBuilder = unifiedUserActionToUserVideoGraphBuilder, - userAdGraphTopic = KafkaEventPublisher.UserAdTopic, - userAdGraphBuilder = unifiedUserActionToUserAdGraphBuilder, - userTweetGraphPlusTopic = KafkaEventPublisher.UserTweetPlusTopic, - userTweetGraphPlusBuilder = unifiedUserActionToUserTweetGraphPlusBuilder)( - statsReceiver.scope("UnifiedUserActionProcessor")) - - val uuaConsumer = new UnifiedUserActionsConsumer(uuaProcessor, truststoreLocation()) - - // Start-up init and graceful shutdown setup - - // wait a bit for services to be ready - Thread.sleep(5000L) - - log.info("Starting the event processors") - eventBusProcessors.foreach(_.start()) - - log.info("Starting the uua processors") - uuaConsumer.atLeastOnceProcessor.start() - - this.addAdminRoute(ElfOwlFilter.getPostbackRoute()) - - onExit { - log.info("Shutting down the event processors") - eventBusProcessors.foreach(_.stop()) - log.info("Shutting down the uua processors") - uuaConsumer.atLeastOnceProcessor.close() - log.info("done exit") - } - - // Wait on the thriftServer so that shutdownTimeout is respected. - Await.result(adminHttpServer) - } -} diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/clients/BUILD b/recos-injector/server/src/main/scala/com/twitter/recosinjector/clients/BUILD deleted file mode 100644 index 9b0d8be74..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/clients/BUILD +++ /dev/null @@ -1,20 +0,0 @@ -scala_library( - platform = "java11", - strict_deps = False, - tags = ["bazel-compatible"], - dependencies = [ - "3rdparty/jvm/com/twitter/storehaus:core", - "finagle/finagle-memcached/src/main/scala", - "finagle/finagle-stats", - "frigate/frigate-common/src/main/scala/com/twitter/frigate/common/base", - "frigate/frigate-common/src/main/scala/com/twitter/frigate/common/util", - "servo/repo/src/main/scala", - "src/thrift/com/twitter/gizmoduck:thrift-scala", - "src/thrift/com/twitter/gizmoduck:user-thrift-scala", - "src/thrift/com/twitter/recos:recos-internal-scala", - "src/thrift/com/twitter/socialgraph:thrift-scala", - "src/thrift/com/twitter/tweetypie:service-scala", - "stitch/stitch-tweetypie/src/main/scala", - "util/util-logging/src/main/scala", - ], -) diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/clients/Gizmoduck.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/clients/Gizmoduck.scala deleted file mode 100644 index f6806a7f5..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/clients/Gizmoduck.scala +++ /dev/null @@ -1,26 +0,0 @@ -package com.twitter.recosinjector.clients - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.gizmoduck.thriftscala.User -import com.twitter.logging.Logger -import com.twitter.storehaus.ReadableStore -import com.twitter.util.Future - -class Gizmoduck( - userStore: ReadableStore[Long, User] -)( - implicit statsReceiver: StatsReceiver) { - private val log = Logger() - private val stats = statsReceiver.scope(this.getClass.getSimpleName) - - def getUser(userId: Long): Future[Option[User]] = { - userStore - .get(userId) - .rescue { - case e => - stats.scope("getUserFailure").counter(e.getClass.getSimpleName).incr() - log.error(s"Failed with message ${e.toString}") - Future.None - } - } -} diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/clients/RecosHoseEntitiesCache.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/clients/RecosHoseEntitiesCache.scala deleted file mode 100644 index c90ac5cc7..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/clients/RecosHoseEntitiesCache.scala +++ /dev/null @@ -1,137 +0,0 @@ -package com.twitter.recosinjector.clients - -import com.twitter.conversions.DurationOps._ -import com.twitter.finagle.memcached.Client -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.io.Buf -import com.twitter.recos.internal.thriftscala.{RecosHoseEntities, RecosHoseEntity} -import com.twitter.servo.cache.ThriftSerializer -import com.twitter.util.{Duration, Future, Time} -import org.apache.thrift.protocol.TBinaryProtocol - -case class CacheEntityEntry( - cachePrefix: String, - hashedEntityId: Int, - entity: String) { - val fullKey: String = cachePrefix + hashedEntityId -} - -object RecosHoseEntitiesCache { - val EntityTTL: Duration = 30.hours - val EntitiesSerializer = - new ThriftSerializer[RecosHoseEntities](RecosHoseEntities, new TBinaryProtocol.Factory()) - - val HashtagPrefix: String = "h" - val UrlPrefix: String = "u" -} - -/** - * A cache layer to store entities. - * Graph services like user_tweet_entity_graph and user_url_graph store user interactions with - * entities in a tweet, such as HashTags and URLs. These entities are string values that can be - * potentially very big. Therefore, we instead store a hashed id in the graph edge, and keep a - * (hashedId -> entity) mapping in this cache. The actual entity values can be recovered - * by the graph service at serving time using this cache. - */ -class RecosHoseEntitiesCache(client: Client) { - import RecosHoseEntitiesCache._ - - private def isEntityWithinTTL(entity: RecosHoseEntity, ttlInMillis: Long): Boolean = { - entity.timestamp.exists(timestamp => Time.now.inMilliseconds - timestamp <= ttlInMillis) - } - - /** - * Add a new RecosHoseEntity into RecosHoseEntities - */ - private def updateRecosHoseEntities( - existingEntitiesOpt: Option[RecosHoseEntities], - newEntityString: String, - stats: StatsReceiver - ): RecosHoseEntities = { - val existingEntities = existingEntitiesOpt.map(_.entities).getOrElse(Nil) - - // Discard expired and duplicate existing entities - val validExistingEntities = existingEntities - .filter(entity => isEntityWithinTTL(entity, EntityTTL.inMillis)) - .filter(_.entity != newEntityString) - - val newRecosHoseEntity = RecosHoseEntity(newEntityString, Some(Time.now.inMilliseconds)) - RecosHoseEntities(validExistingEntities :+ newRecosHoseEntity) - } - - private def getRecosHoseEntitiesCache( - cacheEntries: Seq[CacheEntityEntry], - stats: StatsReceiver - ): Future[Map[String, Option[RecosHoseEntities]]] = { - client - .get(cacheEntries.map(_.fullKey)) - .map(_.map { - case (cacheKey, buf) => - val recosHoseEntitiesTry = EntitiesSerializer.from(Buf.ByteArray.Owned.extract(buf)) - if (recosHoseEntitiesTry.isThrow) { - stats.counter("cache_get_deserialization_failure").incr() - } - cacheKey -> recosHoseEntitiesTry.toOption - }) - .onSuccess { _ => stats.counter("get_cache_success").incr() } - .onFailure { ex => - stats.scope("get_cache_failure").counter(ex.getClass.getSimpleName).incr() - } - } - - private def putRecosHoseEntitiesCache( - cacheKey: String, - recosHoseEntities: RecosHoseEntities, - stats: StatsReceiver - ): Unit = { - val serialized = EntitiesSerializer.to(recosHoseEntities) - if (serialized.isThrow) { - stats.counter("cache_put_serialization_failure").incr() - } - serialized.toOption.map { bytes => - client - .set(cacheKey, 0, EntityTTL.fromNow, Buf.ByteArray.Owned(bytes)) - .onSuccess { _ => stats.counter("put_cache_success").incr() } - .onFailure { ex => - stats.scope("put_cache_failure").counter(ex.getClass.getSimpleName).incr() - } - } - } - - /** - * Store a list of new entities into the cache by their cacheKeys, and remove expired/invalid - * values in the existing cache entries at the same time - */ - def updateEntitiesCache( - newCacheEntries: Seq[CacheEntityEntry], - stats: StatsReceiver - ): Future[Unit] = { - stats.counter("update_cache_request").incr() - getRecosHoseEntitiesCache(newCacheEntries, stats) - .map { existingCacheEntries => - newCacheEntries.foreach { newCacheEntry => - val fullKey = newCacheEntry.fullKey - val existingRecosHoseEntities = existingCacheEntries.get(fullKey).flatten - stats.stat("num_existing_entities").add(existingRecosHoseEntities.size) - if (existingRecosHoseEntities.isEmpty) { - stats.counter("existing_entities_empty").incr() - } - - val updatedRecosHoseEntities = updateRecosHoseEntities( - existingRecosHoseEntities, - newCacheEntry.entity, - stats - ) - stats.stat("num_updated_entities").add(updatedRecosHoseEntities.entities.size) - - if (updatedRecosHoseEntities.entities.nonEmpty) { - putRecosHoseEntitiesCache(fullKey, updatedRecosHoseEntities, stats) - } - } - } - .onSuccess { _ => stats.counter("update_cache_success").incr() } - .onFailure { ex => - stats.scope("update_cache_failure").counter(ex.getClass.getSimpleName).incr() - } - } -} diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/clients/SocialGraph.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/clients/SocialGraph.scala deleted file mode 100644 index f721db2e7..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/clients/SocialGraph.scala +++ /dev/null @@ -1,80 +0,0 @@ -package com.twitter.recosinjector.clients - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.logging.Logger -import com.twitter.socialgraph.thriftscala._ -import com.twitter.storehaus.ReadableStore -import com.twitter.util.Future - -class SocialGraph( - socialGraphIdStore: ReadableStore[IdsRequest, IdsResult] -)( - implicit statsReceiver: StatsReceiver) { - import SocialGraph._ - private val log = Logger() - - private val followedByNotMutedByStats = statsReceiver.scope("followedByNotMutedBy") - - private def fetchIdsFromSocialGraph( - userId: Long, - ids: Seq[Long], - relationshipTypes: Map[RelationshipType, Boolean], - lookupContext: Option[LookupContext] = IncludeInactiveUnionLookupContext, - stats: StatsReceiver - ): Future[Seq[Long]] = { - if (ids.isEmpty) { - stats.counter("fetchIdsEmpty").incr() - Future.Nil - } else { - val relationships = relationshipTypes.map { - case (relationshipType, hasRelationship) => - SrcRelationship( - source = userId, - relationshipType = relationshipType, - hasRelationship = hasRelationship, - targets = Some(ids) - ) - }.toSeq - val idsRequest = IdsRequest( - relationships = relationships, - pageRequest = SelectAllPageRequest, - context = lookupContext - ) - socialGraphIdStore - .get(idsRequest) - .map { _.map(_.ids).getOrElse(Nil) } - .rescue { - case e => - stats.scope("fetchIdsFailure").counter(e.getClass.getSimpleName).incr() - log.error(s"Failed with message ${e.toString}") - Future.Nil - } - } - } - - // which of the users in candidates follow userId and have not muted userId - def followedByNotMutedBy(userId: Long, candidates: Seq[Long]): Future[Seq[Long]] = { - fetchIdsFromSocialGraph( - userId, - candidates, - FollowedByNotMutedRelationships, - IncludeInactiveLookupContext, - followedByNotMutedByStats - ) - } - -} - -object SocialGraph { - val SelectAllPageRequest = Some(PageRequest(selectAll = Some(true))) - - val IncludeInactiveLookupContext = Some(LookupContext(includeInactive = true)) - val IncludeInactiveUnionLookupContext = Some( - LookupContext(includeInactive = true, performUnion = Some(true)) - ) - - val FollowedByNotMutedRelationships: Map[RelationshipType, Boolean] = Map( - RelationshipType.FollowedBy -> true, - RelationshipType.MutedBy -> false - ) -} diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/clients/Tweetypie.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/clients/Tweetypie.scala deleted file mode 100644 index 9e81f6121..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/clients/Tweetypie.scala +++ /dev/null @@ -1,30 +0,0 @@ -package com.twitter.recosinjector.clients - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.stitch.tweetypie.TweetyPie.{TweetyPieException, TweetyPieResult} -import com.twitter.storehaus.ReadableStore -import com.twitter.tweetypie.thriftscala.Tweet -import com.twitter.util.Future - -class Tweetypie( - tweetyPieStore: ReadableStore[Long, TweetyPieResult] -)( - implicit statsReceiver: StatsReceiver) { - private val stats = statsReceiver.scope(this.getClass.getSimpleName) - private val failureStats = stats.scope("getTweetFailure") - - def getTweet(tweetId: Long): Future[Option[Tweet]] = { - tweetyPieStore - .get(tweetId) - .map { _.map(_.tweet) } - .rescue { - case e: TweetyPieException => - // Usually results from trying to query a protected or unsafe tweet - failureStats.scope("TweetyPieException").counter(e.result.tweetState.toString).incr() - Future.None - case e => - failureStats.counter(e.getClass.getSimpleName).incr() - Future.None - } - } -} diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/clients/UrlResolver.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/clients/UrlResolver.scala deleted file mode 100644 index 95817181d..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/clients/UrlResolver.scala +++ /dev/null @@ -1,105 +0,0 @@ -package com.twitter.recosinjector.clients - -import com.twitter.conversions.DurationOps._ -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.finagle.util.DefaultTimer -import com.twitter.frigate.common.util.{SnowflakeUtils, UrlInfo} -import com.twitter.storehaus.{FutureOps, ReadableStore} -import com.twitter.util.{Duration, Future, Timer} - -class UrlResolver( - urlInfoStore: ReadableStore[String, UrlInfo] -)( - implicit statsReceiver: StatsReceiver) { - private val EmptyFutureMap = Future.value(Map.empty[String, String]) - private val stats = statsReceiver.scope(this.getClass.getSimpleName) - private val twitterResolvedUrlCounter = stats.counter("twitterResolvedUrl") - private val resolvedUrlCounter = stats.counter("resolvedUrl") - private val noResolvedUrlCounter = stats.counter("noResolvedUrl") - - private val numNoDelayCounter = stats.counter("urlResolver_no_delay") - private val numDelayCounter = stats.counter("urlResolver_delay") - - implicit val timer: Timer = DefaultTimer - - /** - * Get the resolved URL map of the input raw URLs - * - * @param rawUrls list of raw URLs to query - * @return map of raw URL to resolved URL - */ - def getResolvedUrls(rawUrls: Set[String]): Future[Map[String, String]] = { - FutureOps - .mapCollect(urlInfoStore.multiGet[String](rawUrls)) - .map { resolvedUrlsMap => - resolvedUrlsMap.flatMap { - case ( - url, - Some( - UrlInfo( - Some(resolvedUrl), - Some(_), - Some(domain), - _, - _, - _, - _, - Some(_), - _, - _, - _, - _))) => - if (domain == "Twitter") { // Filter out Twitter based URLs - twitterResolvedUrlCounter.incr() - None - } else { - resolvedUrlCounter.incr() - Some(url -> resolvedUrl) - } - case _ => - noResolvedUrlCounter.incr() - None - } - } - } - - /** - * Get resolved url maps given a list of urls, grouping urls that point to the same webpage - */ - def getResolvedUrls(urls: Seq[String], tweetId: Long): Future[Map[String, String]] = { - if (urls.isEmpty) { - EmptyFutureMap - } else { - Future - .sleep(getUrlResolverDelayDuration(tweetId)) - .before(getResolvedUrls(urls.toSet)) - } - } - - /** - * Given a tweet, return the amount of delay needed before attempting to resolve the Urls - */ - private def getUrlResolverDelayDuration( - tweetId: Long - ): Duration = { - val urlResolverDelaySinceCreation = 12.seconds - val urlResolverDelayDuration = 4.seconds - val noDelay = 0.seconds - - // Check whether the tweet was created more than the specified delay duration before now. - // If the tweet ID is not based on Snowflake, this is false, and the delay is applied. - val isCreatedBeforeDelayThreshold = SnowflakeUtils - .tweetCreationTime(tweetId) - .map(_.untilNow) - .exists(_ > urlResolverDelaySinceCreation) - - if (isCreatedBeforeDelayThreshold) { - numNoDelayCounter.incr() - noDelay - } else { - numDelayCounter.incr() - urlResolverDelayDuration - } - } - -} diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/config/BUILD b/recos-injector/server/src/main/scala/com/twitter/recosinjector/config/BUILD deleted file mode 100644 index 90dfe6868..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/config/BUILD +++ /dev/null @@ -1,36 +0,0 @@ -scala_library( - platform = "java11", - strict_deps = False, - tags = ["bazel-compatible"], - dependencies = [ - "3rdparty/jvm/com/twitter/bijection:scrooge", - "3rdparty/jvm/com/twitter/storehaus:core", - "abdecider", - "decider/src/main/scala", - "finagle/finagle-memcached/src/main/scala", - "finagle/finagle-stats", - "frigate/frigate-common/src/main/scala/com/twitter/frigate/common/store", - "frigate/frigate-common/src/main/scala/com/twitter/frigate/common/util", - "hermit/hermit-core:store", - "hermit/hermit-core/src/main/scala/com/twitter/hermit/store/common", - "hermit/hermit-core/src/main/scala/com/twitter/hermit/store/gizmoduck", - "hermit/hermit-core/src/main/scala/com/twitter/hermit/store/tweetypie", - "recos-injector/server/src/main/scala/com/twitter/recosinjector/decider", - "scribelib/validators/src/main/scala/com/twitter/scribelib/validators", - "src/scala/com/twitter/storehaus_internal/memcache", - "src/scala/com/twitter/storehaus_internal/memcache/config", - "src/scala/com/twitter/storehaus_internal/util", - "src/thrift/com/twitter/gizmoduck:thrift-java", - "src/thrift/com/twitter/gizmoduck:thrift-scala", - "src/thrift/com/twitter/gizmoduck:user-thrift-scala", - "src/thrift/com/twitter/socialgraph:thrift-scala", - "src/thrift/com/twitter/spam/rtf:safety-level-scala", - "src/thrift/com/twitter/tweetypie:service-scala", - "stitch/stitch-core", - "stitch/stitch-socialgraph", - "stitch/stitch-storehaus/src/main/scala", - "stitch/stitch-tweetypie/src/main/scala", - "util/util-hashing/src/main/scala", - "util/util-logging/src/main/scala", - ], -) diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/config/CacheConfig.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/config/CacheConfig.scala deleted file mode 100644 index dae1aba12..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/config/CacheConfig.scala +++ /dev/null @@ -1,23 +0,0 @@ -package com.twitter.recosinjector.config - -import com.twitter.finagle.memcached.Client -import com.twitter.finagle.mtls.authentication.ServiceIdentifier -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.storehaus_internal.memcache.MemcacheStore -import com.twitter.storehaus_internal.util.{ClientName, ZkEndPoint} - -trait CacheConfig { - implicit def statsReceiver: StatsReceiver - - def serviceIdentifier: ServiceIdentifier - - def recosInjectorCoreSvcsCacheDest: String - - val recosInjectorCoreSvcsCacheClient: Client = MemcacheStore.memcachedClient( - name = ClientName("memcache-recos-injector"), - dest = ZkEndPoint(recosInjectorCoreSvcsCacheDest), - statsReceiver = statsReceiver, - serviceIdentifier = serviceIdentifier - ) - -} diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/config/Config.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/config/Config.scala deleted file mode 100644 index c2aa209a8..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/config/Config.scala +++ /dev/null @@ -1,41 +0,0 @@ -package com.twitter.recosinjector.config - -import com.twitter.finagle.mtls.authentication.ServiceIdentifier -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.finagle.thrift.ClientId -import com.twitter.frigate.common.store.TweetCreationTimeMHStore -import com.twitter.frigate.common.util.UrlInfo -import com.twitter.gizmoduck.thriftscala.User -import com.twitter.recosinjector.decider.RecosInjectorDecider -import com.twitter.socialgraph.thriftscala.{IdsRequest, IdsResult} -import com.twitter.stitch.tweetypie.TweetyPie.TweetyPieResult -import com.twitter.storehaus.ReadableStore -import com.twitter.util.Future - -trait Config { self => - implicit def statsReceiver: StatsReceiver - - // ReadableStores - def tweetyPieStore: ReadableStore[Long, TweetyPieResult] - - def userStore: ReadableStore[Long, User] - - def socialGraphIdStore: ReadableStore[IdsRequest, IdsResult] - - def urlInfoStore: ReadableStore[String, UrlInfo] - - // Manhattan stores - def tweetCreationStore: TweetCreationTimeMHStore - - // Decider - def recosInjectorDecider: RecosInjectorDecider - - // Constants - def recosInjectorThriftClientId: ClientId - - def serviceIdentifier: ServiceIdentifier - - def outputKafkaTopicPrefix: String - - def init(): Future[Unit] = Future.Done -} diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/config/DeployConfig.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/config/DeployConfig.scala deleted file mode 100644 index 1e547681b..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/config/DeployConfig.scala +++ /dev/null @@ -1,215 +0,0 @@ -package com.twitter.recosinjector.config - -import com.twitter.bijection.scrooge.BinaryScalaCodec -import com.twitter.conversions.DurationOps._ -import com.twitter.finagle.client.ClientRegistry -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.frigate.common.store.TweetCreationTimeMHStore -import com.twitter.frigate.common.util.Finagle._ -import com.twitter.frigate.common.util.{UrlInfo, UrlInfoInjection, UrlResolver} -import com.twitter.gizmoduck.thriftscala.{LookupContext, QueryFields, User, UserService} -import com.twitter.hermit.store.common.{ObservedCachedReadableStore, ObservedMemcachedReadableStore} -import com.twitter.hermit.store.gizmoduck.GizmoduckUserStore -import com.twitter.hermit.store.tweetypie.TweetyPieStore -import com.twitter.logging.Logger -import com.twitter.pink_floyd.thriftscala.{ClientIdentifier, Storer} -import com.twitter.socialgraph.thriftscala.{IdsRequest, SocialGraphService} -import com.twitter.spam.rtf.thriftscala.SafetyLevel -import com.twitter.stitch.socialgraph.SocialGraph -import com.twitter.stitch.storehaus.ReadableStoreOfStitch -import com.twitter.stitch.tweetypie.TweetyPie.TweetyPieResult -import com.twitter.storage.client.manhattan.kv.{ - ManhattanKVClient, - ManhattanKVClientMtlsParams, - ManhattanKVEndpointBuilder -} -import com.twitter.storehaus.ReadableStore -import com.twitter.tweetypie.thriftscala.{GetTweetOptions, TweetService} -import com.twitter.util.Future - -/* - * Any finagle clients should not be defined as lazy. If defined lazy, - * ClientRegistry.expAllRegisteredClientsResolved() call in init will not ensure that the clients - * are active before thrift endpoint is active. We want the clients to be active, because zookeeper - * resolution triggered by first request(s) might result in the request(s) failing. - */ -trait DeployConfig extends Config with CacheConfig { - implicit def statsReceiver: StatsReceiver - - def log: Logger - - // Clients - val gizmoduckClient = new UserService.FinagledClient( - readOnlyThriftService( - "gizmoduck", - "/s/gizmoduck/gizmoduck", - statsReceiver, - recosInjectorThriftClientId, - requestTimeout = 450.milliseconds, - mTLSServiceIdentifier = Some(serviceIdentifier) - ) - ) - val tweetyPieClient = new TweetService.FinagledClient( - readOnlyThriftService( - "tweetypie", - "/s/tweetypie/tweetypie", - statsReceiver, - recosInjectorThriftClientId, - requestTimeout = 450.milliseconds, - mTLSServiceIdentifier = Some(serviceIdentifier) - ) - ) - - val sgsClient = new SocialGraphService.FinagledClient( - readOnlyThriftService( - "socialgraph", - "/s/socialgraph/socialgraph", - statsReceiver, - recosInjectorThriftClientId, - requestTimeout = 450.milliseconds, - mTLSServiceIdentifier = Some(serviceIdentifier) - ) - ) - - val pinkStoreClient = new Storer.FinagledClient( - readOnlyThriftService( - "pink_store", - "/s/spiderduck/pink-store", - statsReceiver, - recosInjectorThriftClientId, - requestTimeout = 450.milliseconds, - mTLSServiceIdentifier = Some(serviceIdentifier) - ) - ) - - // Stores - private val _gizmoduckStore = { - val queryFields: Set[QueryFields] = Set( - QueryFields.Discoverability, - QueryFields.Labels, - QueryFields.Safety - ) - val context: LookupContext = LookupContext( - includeDeactivated = true, - safetyLevel = Some(SafetyLevel.Recommendations) - ) - - GizmoduckUserStore( - client = gizmoduckClient, - queryFields = queryFields, - context = context, - statsReceiver = statsReceiver - ) - } - - override val userStore: ReadableStore[Long, User] = { - // memcache based cache - ObservedMemcachedReadableStore.fromCacheClient( - backingStore = _gizmoduckStore, - cacheClient = recosInjectorCoreSvcsCacheClient, - ttl = 2.hours - )( - valueInjection = BinaryScalaCodec(User), - statsReceiver = statsReceiver.scope("UserStore"), - keyToString = { k: Long => - s"usri/$k" - } - ) - } - - /** - * TweetyPie store, used to fetch tweet objects when unavailable, and also as a source of - * tweet SafetyLevel filtering. - * Note: we do NOT cache TweetyPie calls, as it makes tweet SafetyLevel filtering less accurate. - * TweetyPie QPS is < 20K/cluster. - * More info is here: - * https://cgit.twitter.biz/source/tree/src/thrift/com/twitter/spam/rtf/safety_level.thrift - */ - override val tweetyPieStore: ReadableStore[Long, TweetyPieResult] = { - val getTweetOptions = Some( - GetTweetOptions( - includeCards = true, - safetyLevel = Some(SafetyLevel.RecosWritePath) - ) - ) - TweetyPieStore( - tweetyPieClient, - getTweetOptions, - convertExceptionsToNotFound = false // Do not suppress TweetyPie errors. Leave it to caller - ) - } - - private val _urlInfoStore = { - //Initialize pink store client, for parsing url - UrlResolver( - pinkStoreClient, - statsReceiver.scope("urlFetcher"), - clientId = ClientIdentifier.Recoshose - ) - } - - override val urlInfoStore: ReadableStore[String, UrlInfo] = { - // memcache based cache - val memcachedStore = ObservedMemcachedReadableStore.fromCacheClient( - backingStore = _urlInfoStore, - cacheClient = recosInjectorCoreSvcsCacheClient, - ttl = 2.hours - )( - valueInjection = UrlInfoInjection, - statsReceiver = statsReceiver.scope("UrlInfoStore"), - keyToString = { k: String => - s"uisri/$k" - } - ) - - ObservedCachedReadableStore.from( - memcachedStore, - ttl = 1.minutes, - maxKeys = 1e5.toInt, - windowSize = 10000L, - cacheName = "url_store_in_proc_cache" - )(statsReceiver.scope("url_store_in_proc_cache")) - } - - override val socialGraphIdStore = ReadableStoreOfStitch { idsRequest: IdsRequest => - SocialGraph(sgsClient).ids(idsRequest) - } - - /** - * MH Store for updating the last time user created a tweet - */ - val tweetCreationStore: TweetCreationTimeMHStore = { - val client = ManhattanKVClient( - appId = "recos_tweet_creation_info", - dest = "/s/manhattan/omega.native-thrift", - mtlsParams = ManhattanKVClientMtlsParams(serviceIdentifier) - ) - - val endpoint = ManhattanKVEndpointBuilder(client) - .defaultMaxTimeout(700.milliseconds) - .statsReceiver( - statsReceiver - .scope(serviceIdentifier.zone) - .scope(serviceIdentifier.environment) - .scope("recos_injector_tweet_creation_info_store") - ) - .build() - - val dataset = if (serviceIdentifier.environment == "prod") { - "recos_injector_tweet_creation_info" - } else { - "recos_injector_tweet_creation_info_staging" - } - - new TweetCreationTimeMHStore( - cluster = serviceIdentifier.zone, - endpoint = endpoint, - dataset = dataset, - writeTtl = Some(14.days), - statsReceiver.scope("recos_injector_tweet_creation_info_store") - ) - } - - // wait for all serversets to populate - override def init(): Future[Unit] = ClientRegistry.expAllRegisteredClientsResolved().unit -} diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/config/ProdConfig.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/config/ProdConfig.scala deleted file mode 100644 index 7ead5c34d..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/config/ProdConfig.scala +++ /dev/null @@ -1,29 +0,0 @@ -package com.twitter.recosinjector.config - -import com.twitter.finagle.mtls.authentication.ServiceIdentifier -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.finagle.thrift.ClientId -import com.twitter.logging.Logger -import com.twitter.recosinjector.decider.RecosInjectorDecider - -case class ProdConfig( - override val serviceIdentifier: ServiceIdentifier -)(implicit val statsReceiver: StatsReceiver) extends { - // Due to trait initialization logic in Scala, any abstract members declared in Config or - // DeployConfig should be declared in this block. Otherwise the abstract member might initialize - // to null if invoked before before object creation finishing. - - val recosInjectorThriftClientId = ClientId("recos-injector.prod") - - val outputKafkaTopicPrefix = "recos_injector" - - val log = Logger("ProdConfig") - - val recosInjectorCoreSvcsCacheDest = "/srv#/prod/local/cache/recos_metadata" - - val recosInjectorDecider = RecosInjectorDecider( - isProd = true, - dataCenter = serviceIdentifier.zone - ) - -} with DeployConfig diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/config/StagingConfig.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/config/StagingConfig.scala deleted file mode 100644 index 1caabea8e..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/config/StagingConfig.scala +++ /dev/null @@ -1,33 +0,0 @@ -package com.twitter.recosinjector.config - -import com.twitter.finagle.mtls.authentication.ServiceIdentifier -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.finagle.thrift.ClientId -import com.twitter.logging.Logger -import com.twitter.recosinjector.decider.RecosInjectorDecider - -case class StagingConfig( - override val serviceIdentifier: ServiceIdentifier -)( - implicit val statsReceiver: StatsReceiver) - extends { - // Due to trait initialization logic in Scala, any abstract members declared in Config or - // DeployConfig should be declared in this block. Otherwise the abstract member might initialize - // to null if invoked before before object creation finishing. - - val recosInjectorThriftClientId = ClientId("recos-injector.staging") - - val outputKafkaTopicPrefix = "staging_recos_injector" - - val log = Logger("StagingConfig") - - val recosInjectorCoreSvcsCacheDest = "/srv#/test/local/cache/twemcache_recos" - - val recosInjectorDecider = RecosInjectorDecider( - isProd = false, - dataCenter = serviceIdentifier.zone - ) - - val abDeciderLoggerNode = "staging_abdecider_scribe" - -} with DeployConfig diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/decider/BUILD b/recos-injector/server/src/main/scala/com/twitter/recosinjector/decider/BUILD deleted file mode 100644 index 0a728c357..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/decider/BUILD +++ /dev/null @@ -1,7 +0,0 @@ -scala_library( - platform = "java11", - tags = ["bazel-compatible"], - dependencies = [ - "decider/src/main/scala", - ], -) diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/decider/RecosInjectorDecider.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/decider/RecosInjectorDecider.scala deleted file mode 100644 index d9bb86bf1..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/decider/RecosInjectorDecider.scala +++ /dev/null @@ -1,33 +0,0 @@ -package com.twitter.recosinjector.decider - -import com.twitter.decider.{Decider, DeciderFactory, RandomRecipient, Recipient} - -case class RecosInjectorDecider(isProd: Boolean, dataCenter: String) { - lazy val decider: Decider = DeciderFactory( - Some("config/decider.yml"), - Some(getOverlayPath(isProd, dataCenter)) - )() - - private def getOverlayPath(isProd: Boolean, dataCenter: String): String = { - if (isProd) { - s"/usr/local/config/overlays/recos-injector/recos-injector/prod/$dataCenter/decider_overlay.yml" - } else { - s"/usr/local/config/overlays/recos-injector/recos-injector/staging/$dataCenter/decider_overlay.yml" - } - } - - def getDecider: Decider = decider - - def isAvailable(feature: String, recipient: Option[Recipient]): Boolean = { - decider.isAvailable(feature, recipient) - } - - def isAvailable(feature: String): Boolean = isAvailable(feature, Some(RandomRecipient)) -} - -object RecosInjectorDeciderConstants { - val TweetEventTransformerUserTweetEntityEdgesDecider = - "tweet_event_transformer_user_tweet_entity_edges" - val EnableEmitTweetEdgeFromReply = "enable_emit_tweet_edge_from_reply" - val EnableUnfavoriteEdge = "enable_unfavorite_edge" -} diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/BUILD b/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/BUILD deleted file mode 100644 index c19b2d96e..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/BUILD +++ /dev/null @@ -1,23 +0,0 @@ -scala_library( - platform = "java11", - strict_deps = False, - tags = ["bazel-compatible"], - dependencies = [ - "3rdparty/jvm/com/twitter/storehaus:core", - "finagle/finagle-stats", - "frigate/frigate-common/src/main/scala/com/twitter/frigate/common/store", - "hermit/hermit-core/src/main/scala/com/twitter/hermit/predicate/socialgraph", - "recos-injector/server/src/main/scala/com/twitter/recosinjector/clients", - "recos-injector/server/src/main/scala/com/twitter/recosinjector/decider", - "recos-injector/server/src/main/scala/com/twitter/recosinjector/filters", - "recos-injector/server/src/main/scala/com/twitter/recosinjector/publishers", - "recos-injector/server/src/main/scala/com/twitter/recosinjector/util", - "src/scala/com/twitter/recos/util:recos-util", - "src/thrift/com/twitter/recos:recos-injector-scala", - "src/thrift/com/twitter/recos:recos-internal-scala", - "src/thrift/com/twitter/socialgraph:thrift-scala", - "src/thrift/com/twitter/timelineservice/server/internal:thrift-scala", - "src/thrift/com/twitter/tweetypie:events-scala", - "src/thrift/com/twitter/tweetypie:tweet-scala", - ], -) diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/Edges.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/Edges.scala deleted file mode 100644 index e7ccd4fb0..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/Edges.scala +++ /dev/null @@ -1,87 +0,0 @@ -package com.twitter.recosinjector.edges - -import com.twitter.recos.internal.thriftscala.RecosHoseMessage -import com.twitter.recos.recos_injector.thriftscala.{Features, UserTweetAuthorGraphMessage} -import com.twitter.recos.util.Action.Action -import com.twitter.recosinjector.util.TweetDetails -import scala.collection.Map - -trait Edge { - // RecosHoseMessage is the thrift struct that the graphs consume. - def convertToRecosHoseMessage: RecosHoseMessage - - // UserTweetAuthorGraphMessage is the thrift struct that user_tweet_author_graph consumes. - def convertToUserTweetAuthorGraphMessage: UserTweetAuthorGraphMessage -} - -/** - * Edge corresponding to UserTweetEntityEdge. - * It captures user-tweet interactions: Create, Like, Retweet, Reply etc. - */ -case class UserTweetEntityEdge( - sourceUser: Long, - targetTweet: Long, - action: Action, - cardInfo: Option[Byte], - metadata: Option[Long], - entitiesMap: Option[Map[Byte, Seq[Int]]], - tweetDetails: Option[TweetDetails]) - extends Edge { - - override def convertToRecosHoseMessage: RecosHoseMessage = { - RecosHoseMessage( - leftId = sourceUser, - rightId = targetTweet, - action = action.id.toByte, - card = cardInfo, - entities = entitiesMap, - edgeMetadata = metadata - ) - } - - private def getFeatures(tweetDetails: TweetDetails): Features = { - Features( - hasPhoto = Some(tweetDetails.hasPhoto), - hasVideo = Some(tweetDetails.hasVideo), - hasUrl = Some(tweetDetails.hasUrl), - hasHashtag = Some(tweetDetails.hasHashtag) - ) - } - - override def convertToUserTweetAuthorGraphMessage: UserTweetAuthorGraphMessage = { - UserTweetAuthorGraphMessage( - leftId = sourceUser, - rightId = targetTweet, - action = action.id.toByte, - card = cardInfo, - authorId = tweetDetails.flatMap(_.authorId), - features = tweetDetails.map(getFeatures) - ) - } -} - -/** - * Edge corresponding to UserUserGraph. - * It captures user-user interactions: Follow, Mention, Mediatag. - */ -case class UserUserEdge( - sourceUser: Long, - targetUser: Long, - action: Action, - metadata: Option[Long]) - extends Edge { - override def convertToRecosHoseMessage: RecosHoseMessage = { - RecosHoseMessage( - leftId = sourceUser, - rightId = targetUser, - action = action.id.toByte, - edgeMetadata = metadata - ) - } - - override def convertToUserTweetAuthorGraphMessage: UserTweetAuthorGraphMessage = { - throw new RuntimeException( - "convertToUserTweetAuthorGraphMessage not implemented in UserUserEdge.") - } - -} diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/EventToMessageBuilder.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/EventToMessageBuilder.scala deleted file mode 100644 index 933bec61b..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/EventToMessageBuilder.scala +++ /dev/null @@ -1,82 +0,0 @@ -package com.twitter.recosinjector.edges - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.frigate.common.base.Stats.track -import com.twitter.util.Future - -/** - * This is the generic interface that converts incoming Events (ex. TweetEvent, FavEvent, etc) - * into Edge for a specific output graph. It applies the following flow: - * - * event -> update event stats -> build edges -> filter edges - * - * Top-level statistics are provided for each step, such as latency and number of events - */ -trait EventToMessageBuilder[Event, E <: Edge] { - implicit val statsReceiver: StatsReceiver - - private lazy val processEventStats = statsReceiver.scope("process_event") - private lazy val numEventsStats = statsReceiver.counter("num_process_event") - private lazy val rejectEventStats = statsReceiver.counter("num_reject_event") - private lazy val buildEdgesStats = statsReceiver.scope("build") - private lazy val numAllEdgesStats = buildEdgesStats.counter("num_all_edges") - private lazy val filterEdgesStats = statsReceiver.scope("filter") - private lazy val numValidEdgesStats = statsReceiver.counter("num_valid_edges") - private lazy val numRecosHoseMessageStats = statsReceiver.counter("num_RecosHoseMessage") - - /** - * Given an incoming event, process and convert it into a sequence of RecosHoseMessages - * @param event - * @return - */ - def processEvent(event: Event): Future[Seq[Edge]] = { - track(processEventStats) { - shouldProcessEvent(event).flatMap { - case true => - numEventsStats.incr() - updateEventStatus(event) - for { - allEdges <- track(buildEdgesStats)(buildEdges(event)) - filteredEdges <- track(filterEdgesStats)(filterEdges(event, allEdges)) - } yield { - numAllEdgesStats.incr(allEdges.size) - numValidEdgesStats.incr(filteredEdges.size) - numRecosHoseMessageStats.incr(filteredEdges.size) - filteredEdges - } - case false => - rejectEventStats.incr() - Future.Nil - } - } - } - - /** - * Pre-process filter that determines whether the given event should be used to build edges. - * @param event - * @return - */ - def shouldProcessEvent(event: Event): Future[Boolean] - - /** - * Update cache/event logging related to the specific event. - * By default, no action will be taken. Override when necessary - * @param event - */ - def updateEventStatus(event: Event): Unit = {} - - /** - * Given an event, extract info and build a sequence of edges - * @param event - * @return - */ - def buildEdges(event: Event): Future[Seq[E]] - - /** - * Given a sequence of edges, filter and return the valid edges - * @param event - * @param edges - * @return - */ - def filterEdges(event: Event, edges: Seq[E]): Future[Seq[E]] -} diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/SocialWriteEventToUserUserGraphBuilder.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/SocialWriteEventToUserUserGraphBuilder.scala deleted file mode 100644 index edebbada1..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/SocialWriteEventToUserUserGraphBuilder.scala +++ /dev/null @@ -1,73 +0,0 @@ -package com.twitter.recosinjector.edges - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.recos.util.Action -import com.twitter.socialgraph.thriftscala.{ - Action => SocialGraphAction, - FollowGraphEvent, - FollowType, - WriteEvent -} -import com.twitter.util.Future - -/** - * Converts a WriteEvent to UserUserGraph's messages, including Mention and Mediatag messages - */ -class SocialWriteEventToUserUserGraphBuilder()(override implicit val statsReceiver: StatsReceiver) - extends EventToMessageBuilder[WriteEvent, UserUserEdge] { - private val followOrFrictionlessFollowCounter = - statsReceiver.counter("num_follow_or_frictionless") - private val notFollowOrFrictionlessFollowCounter = - statsReceiver.counter("num_not_follow_or_frictionless") - private val followEdgeCounter = statsReceiver.counter("num_follow_edge") - - /** - * For now, we are only interested in Follow events - */ - override def shouldProcessEvent(event: WriteEvent): Future[Boolean] = { - event.action match { - case SocialGraphAction.Follow | SocialGraphAction.FrictionlessFollow => - followOrFrictionlessFollowCounter.incr() - Future(true) - case _ => - notFollowOrFrictionlessFollowCounter.incr() - Future(false) - } - } - - /** - * Determine whether a Follow event is valid/error free. - */ - private def isValidFollowEvent(followEvent: FollowGraphEvent): Boolean = { - followEvent.followType match { - case Some(FollowType.NormalFollow) | Some(FollowType.FrictionlessFollow) => - followEvent.result.validationError.isEmpty - case _ => - false - } - } - - override def buildEdges(event: WriteEvent): Future[Seq[UserUserEdge]] = { - val userUserEdges = event.follow - .map(_.collect { - case followEvent if isValidFollowEvent(followEvent) => - val sourceUserId = followEvent.result.request.source - val targetUserId = followEvent.result.request.target - followEdgeCounter.incr() - UserUserEdge( - sourceUserId, - targetUserId, - Action.Follow, - Some(System.currentTimeMillis()) - ) - }).getOrElse(Nil) - Future(userUserEdges) - } - - override def filterEdges( - event: WriteEvent, - edges: Seq[UserUserEdge] - ): Future[Seq[UserUserEdge]] = { - Future(edges) - } -} diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/TimelineEventToUserTweetEntityGraphBuilder.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/TimelineEventToUserTweetEntityGraphBuilder.scala deleted file mode 100644 index 73411b510..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/TimelineEventToUserTweetEntityGraphBuilder.scala +++ /dev/null @@ -1,60 +0,0 @@ -package com.twitter.recosinjector.edges - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.recos.util.Action -import com.twitter.recosinjector.util.TweetFavoriteEventDetails -import com.twitter.util.Future - -class TimelineEventToUserTweetEntityGraphBuilder( - userTweetEntityEdgeBuilder: UserTweetEntityEdgeBuilder -)( - override implicit val statsReceiver: StatsReceiver) - extends EventToMessageBuilder[TweetFavoriteEventDetails, UserTweetEntityEdge] { - - private val numFavEdgeCounter = statsReceiver.counter("num_favorite_edge") - private val numUnfavEdgeCounter = statsReceiver.counter("num_unfavorite_edge") - - override def shouldProcessEvent(event: TweetFavoriteEventDetails): Future[Boolean] = { - Future(true) - } - - override def buildEdges(details: TweetFavoriteEventDetails): Future[Seq[UserTweetEntityEdge]] = { - val engagement = details.userTweetEngagement - val tweetDetails = engagement.tweetDetails - - val entitiesMapFut = userTweetEntityEdgeBuilder.getEntitiesMapAndUpdateCache( - tweetId = engagement.tweetId, - tweetDetails = tweetDetails - ) - - entitiesMapFut - .map { entitiesMap => - UserTweetEntityEdge( - sourceUser = engagement.engageUserId, - targetTweet = engagement.tweetId, - action = engagement.action, - metadata = engagement.engagementTimeMillis, - cardInfo = engagement.tweetDetails.map(_.cardInfo.toByte), - entitiesMap = entitiesMap, - tweetDetails = tweetDetails - ) - } - .map { edge => - edge match { - case fav if fav.action == Action.Favorite => - numFavEdgeCounter.incr() - case unfav if unfav.action == Action.Unfavorite => - numUnfavEdgeCounter.incr() - case _ => - } - Seq(edge) - } - } - - override def filterEdges( - event: TweetFavoriteEventDetails, - edges: Seq[UserTweetEntityEdge] - ): Future[Seq[UserTweetEntityEdge]] = { - Future(edges) - } -} diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/TimelineEventToUserTweetGraphBuilder.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/TimelineEventToUserTweetGraphBuilder.scala deleted file mode 100644 index 5d5572a54..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/TimelineEventToUserTweetGraphBuilder.scala +++ /dev/null @@ -1,54 +0,0 @@ -package com.twitter.recosinjector.edges - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.recos.util.Action -import com.twitter.recosinjector.util.TweetFavoriteEventDetails -import com.twitter.util.Future - -class TimelineEventToUserTweetGraphBuilder( - userTweetEntityEdgeBuilder: UserTweetEntityEdgeBuilder -)( - override implicit val statsReceiver: StatsReceiver) - extends EventToMessageBuilder[TweetFavoriteEventDetails, UserTweetEntityEdge] { - - override def shouldProcessEvent(event: TweetFavoriteEventDetails): Future[Boolean] = { - Future(true) - } - - override def buildEdges(details: TweetFavoriteEventDetails): Future[Seq[UserTweetEntityEdge]] = { - val engagement = details.userTweetEngagement - - engagement.action match { - case Action.Favorite => - val tweetDetails = engagement.tweetDetails - - val entitiesMapFut = userTweetEntityEdgeBuilder.getEntitiesMapAndUpdateCache( - tweetId = engagement.tweetId, - tweetDetails = tweetDetails - ) - - entitiesMapFut - .map { entitiesMap => - UserTweetEntityEdge( - sourceUser = engagement.engageUserId, - targetTweet = engagement.tweetId, - action = engagement.action, - metadata = engagement.engagementTimeMillis, - cardInfo = engagement.tweetDetails.map(_.cardInfo.toByte), - entitiesMap = entitiesMap, - tweetDetails = tweetDetails - ) - } - .map(Seq(_)) - - case _ => Future.Nil - } - } - - override def filterEdges( - event: TweetFavoriteEventDetails, - edges: Seq[UserTweetEntityEdge] - ): Future[Seq[UserTweetEntityEdge]] = { - Future(edges) - } -} diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/TweetEventToUserTweetEntityGraphBuilder.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/TweetEventToUserTweetEntityGraphBuilder.scala deleted file mode 100644 index ef4a200ab..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/TweetEventToUserTweetEntityGraphBuilder.scala +++ /dev/null @@ -1,343 +0,0 @@ -package com.twitter.recosinjector.edges - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.frigate.common.store.TweetCreationTimeMHStore -import com.twitter.frigate.common.util.SnowflakeUtils -import com.twitter.recos.internal.thriftscala.{RecosUserTweetInfo, TweetType} -import com.twitter.recos.util.Action -import com.twitter.recosinjector.decider.RecosInjectorDecider -import com.twitter.recosinjector.decider.RecosInjectorDeciderConstants -import com.twitter.recosinjector.util.TweetCreateEventDetails -import com.twitter.util.{Future, Time} - -class TweetEventToUserTweetEntityGraphBuilder( - userTweetEntityEdgeBuilder: UserTweetEntityEdgeBuilder, - tweetCreationStore: TweetCreationTimeMHStore, - decider: RecosInjectorDecider -)( - override implicit val statsReceiver: StatsReceiver) - extends EventToMessageBuilder[TweetCreateEventDetails, UserTweetEntityEdge] { - - // TweetCreationStore counters - private val lastTweetTimeNotInMh = statsReceiver.counter("last_tweet_time_not_in_mh") - private val tweetCreationStoreInserts = statsReceiver.counter("tweet_creation_store_inserts") - - private val numInvalidActionCounter = statsReceiver.counter("num_invalid_tweet_action") - - private val numTweetEdgesCounter = statsReceiver.counter("num_tweet_edge") - private val numRetweetEdgesCounter = statsReceiver.counter("num_retweet_edge") - private val numReplyEdgesCounter = statsReceiver.counter("num_reply_edge") - private val numQuoteEdgesCounter = statsReceiver.counter("num_quote_edge") - private val numIsMentionedEdgesCounter = statsReceiver.counter("num_isMentioned_edge") - private val numIsMediataggedEdgesCounter = statsReceiver.counter("num_isMediatagged_edge") - - private val numIsDecider = statsReceiver.counter("num_decider_enabled") - private val numIsNotDecider = statsReceiver.counter("num_decider_not_enabled") - - override def shouldProcessEvent(event: TweetCreateEventDetails): Future[Boolean] = { - val isDecider = decider.isAvailable( - RecosInjectorDeciderConstants.TweetEventTransformerUserTweetEntityEdgesDecider - ) - if (isDecider) { - numIsDecider.incr() - Future(true) - } else { - numIsNotDecider.incr() - Future(false) - } - } - - /** - * Build edges Reply event. Reply event emits 2 edges: - * author -> Reply -> SourceTweetId - * author -> Tweet -> ReplyId - * Do not associate entities in reply tweet to the source tweet - */ - private def buildReplyEdge(event: TweetCreateEventDetails) = { - val userTweetEngagement = event.userTweetEngagement - val authorId = userTweetEngagement.engageUserId - - val replyEdgeFut = event.sourceTweetDetails - .map { sourceTweetDetails => - val sourceTweetId = sourceTweetDetails.tweet.id - val sourceTweetEntitiesMapFut = userTweetEntityEdgeBuilder.getEntitiesMapAndUpdateCache( - tweetId = sourceTweetId, - tweetDetails = Some(sourceTweetDetails) - ) - - sourceTweetEntitiesMapFut.map { sourceTweetEntitiesMap => - val replyEdge = UserTweetEntityEdge( - sourceUser = authorId, - targetTweet = sourceTweetId, - action = Action.Reply, - metadata = Some(userTweetEngagement.tweetId), - cardInfo = Some(sourceTweetDetails.cardInfo.toByte), - entitiesMap = sourceTweetEntitiesMap, - tweetDetails = Some(sourceTweetDetails) - ) - numReplyEdgesCounter.incr() - Some(replyEdge) - } - }.getOrElse(Future.None) - - val tweetCreationEdgeFut = - if (decider.isAvailable(RecosInjectorDeciderConstants.EnableEmitTweetEdgeFromReply)) { - getAndUpdateLastTweetCreationTime( - authorId = authorId, - tweetId = userTweetEngagement.tweetId, - tweetType = TweetType.Reply - ).map { lastTweetTime => - val edge = UserTweetEntityEdge( - sourceUser = authorId, - targetTweet = userTweetEngagement.tweetId, - action = Action.Tweet, - metadata = lastTweetTime, - cardInfo = userTweetEngagement.tweetDetails.map(_.cardInfo.toByte), - entitiesMap = None, - tweetDetails = userTweetEngagement.tweetDetails - ) - numTweetEdgesCounter.incr() - Some(edge) - } - } else { - Future.None - } - - Future.join(replyEdgeFut, tweetCreationEdgeFut).map { - case (replyEdgeOpt, tweetCreationEdgeOpt) => - tweetCreationEdgeOpt.toSeq ++ replyEdgeOpt.toSeq - } - } - - /** - * Build a Retweet UTEG edge: author -> RT -> SourceTweetId. - */ - private def buildRetweetEdge(event: TweetCreateEventDetails) = { - val userTweetEngagement = event.userTweetEngagement - val tweetId = userTweetEngagement.tweetId - - event.sourceTweetDetails - .map { sourceTweetDetails => - val sourceTweetId = sourceTweetDetails.tweet.id // Id of the tweet being Retweeted - val sourceTweetEntitiesMapFut = userTweetEntityEdgeBuilder.getEntitiesMapAndUpdateCache( - tweetId = sourceTweetId, - tweetDetails = Some(sourceTweetDetails) - ) - - sourceTweetEntitiesMapFut.map { sourceTweetEntitiesMap => - val edge = UserTweetEntityEdge( - sourceUser = userTweetEngagement.engageUserId, - targetTweet = sourceTweetId, - action = Action.Retweet, - metadata = Some(tweetId), // metadata is the tweetId - cardInfo = Some(sourceTweetDetails.cardInfo.toByte), - entitiesMap = sourceTweetEntitiesMap, - tweetDetails = Some(sourceTweetDetails) - ) - numRetweetEdgesCounter.incr() - Seq(edge) - } - }.getOrElse(Future.Nil) - } - - /** - * Build edges for a Quote event. Quote tweet emits 2 edges: - * 1. A quote social proof: author -> Quote -> SourceTweetId - * 2. A tweet creation edge: author -> Tweet -> QuoteTweetId - */ - private def buildQuoteEdges( - event: TweetCreateEventDetails - ): Future[Seq[UserTweetEntityEdge]] = { - val userTweetEngagement = event.userTweetEngagement - val tweetId = userTweetEngagement.tweetId - val authorId = userTweetEngagement.engageUserId - - // do not associate entities in quote tweet to the source tweet, - // but associate entities to quote tweet in tweet creation event - val quoteTweetEdgeFut = event.sourceTweetDetails - .map { sourceTweetDetails => - val sourceTweetId = sourceTweetDetails.tweet.id // Id of the tweet being quoted - val sourceTweetEntitiesMapFut = userTweetEntityEdgeBuilder.getEntitiesMapAndUpdateCache( - tweetId = sourceTweetId, - tweetDetails = event.sourceTweetDetails - ) - - sourceTweetEntitiesMapFut.map { sourceTweetEntitiesMap => - val edge = UserTweetEntityEdge( - sourceUser = authorId, - targetTweet = sourceTweetId, - action = Action.Quote, - metadata = Some(tweetId), // metadata is tweetId - cardInfo = Some(sourceTweetDetails.cardInfo.toByte), // cardInfo of the source tweet - entitiesMap = sourceTweetEntitiesMap, - tweetDetails = Some(sourceTweetDetails) - ) - numQuoteEdgesCounter.incr() - Seq(edge) - } - }.getOrElse(Future.Nil) - - val tweetCreationEdgeFut = getAndUpdateLastTweetCreationTime( - authorId = authorId, - tweetId = tweetId, - tweetType = TweetType.Quote - ).map { lastTweetTime => - val metadata = lastTweetTime - val cardInfo = userTweetEngagement.tweetDetails.map(_.cardInfo.toByte) - val edge = UserTweetEntityEdge( - sourceUser = authorId, - targetTweet = tweetId, - action = Action.Tweet, - metadata = metadata, - cardInfo = cardInfo, - entitiesMap = None, - tweetDetails = userTweetEngagement.tweetDetails - ) - numTweetEdgesCounter.incr() - Seq(edge) - } - - Future.join(quoteTweetEdgeFut, tweetCreationEdgeFut).map { - case (quoteEdge, creationEdge) => - quoteEdge ++ creationEdge - } - } - - /** - * Build edges for a Tweet event. A Tweet emits 3 tyes edges: - * 1. A tweet creation edge: author -> Tweet -> TweetId - * 2. IsMentioned edges: mentionedUserId -> IsMentioned -> TweetId - * 3. IsMediatagged edges: mediataggedUserId -> IsMediatagged -> TweetId - */ - private def buildTweetEdges(event: TweetCreateEventDetails): Future[Seq[UserTweetEntityEdge]] = { - val userTweetEngagement = event.userTweetEngagement - val tweetDetails = userTweetEngagement.tweetDetails - val tweetId = userTweetEngagement.tweetId - val authorId = userTweetEngagement.engageUserId - - val cardInfo = tweetDetails.map(_.cardInfo.toByte) - - val entitiesMapFut = userTweetEntityEdgeBuilder.getEntitiesMapAndUpdateCache( - tweetId = tweetId, - tweetDetails = tweetDetails - ) - - val lastTweetTimeFut = getAndUpdateLastTweetCreationTime( - authorId = authorId, - tweetId = tweetId, - tweetType = TweetType.Tweet - ) - - Future.join(entitiesMapFut, lastTweetTimeFut).map { - case (entitiesMap, lastTweetTime) => - val tweetCreationEdge = UserTweetEntityEdge( - sourceUser = authorId, - targetTweet = tweetId, - action = Action.Tweet, - metadata = lastTweetTime, - cardInfo = cardInfo, - entitiesMap = entitiesMap, - tweetDetails = userTweetEngagement.tweetDetails - ) - numTweetEdgesCounter.incr() - - val isMentionedEdges = event.validMentionUserIds - .map(_.map { mentionedUserId => - UserTweetEntityEdge( - sourceUser = mentionedUserId, - targetTweet = tweetId, - action = Action.IsMentioned, - metadata = Some(tweetId), - cardInfo = cardInfo, - entitiesMap = entitiesMap, - tweetDetails = userTweetEngagement.tweetDetails - ) - }).getOrElse(Nil) - numIsMentionedEdgesCounter.incr(isMentionedEdges.size) - - val isMediataggedEdges = event.validMediatagUserIds - .map(_.map { mediataggedUserId => - UserTweetEntityEdge( - sourceUser = mediataggedUserId, - targetTweet = tweetId, - action = Action.IsMediaTagged, - metadata = Some(tweetId), - cardInfo = cardInfo, - entitiesMap = entitiesMap, - tweetDetails = userTweetEngagement.tweetDetails - ) - }).getOrElse(Nil) - numIsMediataggedEdgesCounter.incr(isMediataggedEdges.size) - - Seq(tweetCreationEdge) ++ isMentionedEdges ++ isMediataggedEdges - } - } - - /** - * For a given user, read the user's last time tweeted from the MH store, and - * write the new tweet time into the MH store before returning. - * Note this function is async, so the MH write operations will continue to execute on its own. - * This might create a read/write race condition, but it's expected. - */ - private def getAndUpdateLastTweetCreationTime( - authorId: Long, - tweetId: Long, - tweetType: TweetType - ): Future[Option[Long]] = { - val newTweetInfo = RecosUserTweetInfo( - authorId, - tweetId, - tweetType, - SnowflakeUtils.tweetCreationTime(tweetId).map(_.inMillis).getOrElse(Time.now.inMillis) - ) - - tweetCreationStore - .get(authorId) - .map(_.map { previousTweetInfoSeq => - val lastTweetTime = previousTweetInfoSeq - .filter(info => info.tweetType == TweetType.Tweet || info.tweetType == TweetType.Quote) - .map(_.tweetTimestamp) - .sortBy(-_) - .headOption // Fetch the latest time user Tweeted or Quoted - .getOrElse( - Time.Bottom.inMillis - ) // Last tweet time never recorded in MH, default to oldest point in time - - if (lastTweetTime == Time.Bottom.inMillis) lastTweetTimeNotInMh.incr() - lastTweetTime - }) - .ensure { - tweetCreationStore - .put(authorId, newTweetInfo) - .onSuccess(_ => tweetCreationStoreInserts.incr()) - .onFailure { e => - statsReceiver.counter("write_failed_with_ex:" + e.getClass.getName).incr() - } - } - } - - override def buildEdges(event: TweetCreateEventDetails): Future[Seq[UserTweetEntityEdge]] = { - val userTweetEngagement = event.userTweetEngagement - userTweetEngagement.action match { - case Action.Reply => - buildReplyEdge(event) - case Action.Retweet => - buildRetweetEdge(event) - case Action.Tweet => - buildTweetEdges(event) - case Action.Quote => - buildQuoteEdges(event) - case _ => - numInvalidActionCounter.incr() - Future.Nil - } - - } - - override def filterEdges( - event: TweetCreateEventDetails, - edges: Seq[UserTweetEntityEdge] - ): Future[Seq[UserTweetEntityEdge]] = { - Future(edges) // No filtering for now. Add more if needed - } -} diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/TweetEventToUserTweetGraphBuilder.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/TweetEventToUserTweetGraphBuilder.scala deleted file mode 100644 index c7f223800..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/TweetEventToUserTweetGraphBuilder.scala +++ /dev/null @@ -1,88 +0,0 @@ -package com.twitter.recosinjector.edges - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.frigate.common.store.TweetCreationTimeMHStore -import com.twitter.frigate.common.util.SnowflakeUtils -import com.twitter.recos.internal.thriftscala.RecosUserTweetInfo -import com.twitter.recos.internal.thriftscala.TweetType -import com.twitter.recos.util.Action -import com.twitter.recosinjector.decider.RecosInjectorDecider -import com.twitter.recosinjector.decider.RecosInjectorDeciderConstants -import com.twitter.recosinjector.util.TweetCreateEventDetails -import com.twitter.util.Future -import com.twitter.util.Time - -class TweetEventToUserTweetGraphBuilder( - userTweetEntityEdgeBuilder: UserTweetEntityEdgeBuilder, - tweetCreationStore: TweetCreationTimeMHStore, - decider: RecosInjectorDecider -)( - override implicit val statsReceiver: StatsReceiver) - extends EventToMessageBuilder[TweetCreateEventDetails, UserTweetEntityEdge] { - - private val numRetweetEdgesCounter = statsReceiver.counter("num_retweet_edge") - private val numIsDecider = statsReceiver.counter("num_decider_enabled") - private val numIsNotDecider = statsReceiver.counter("num_decider_not_enabled") - - override def shouldProcessEvent(event: TweetCreateEventDetails): Future[Boolean] = { - val isDecider = decider.isAvailable( - RecosInjectorDeciderConstants.TweetEventTransformerUserTweetEntityEdgesDecider - ) - if (isDecider) { - numIsDecider.incr() - Future(true) - } else { - numIsNotDecider.incr() - Future(false) - } - } - - /** - * Build a Retweet edge: author -> RT -> SourceTweetId. - */ - private def buildRetweetEdge(event: TweetCreateEventDetails) = { - val userTweetEngagement = event.userTweetEngagement - val tweetId = userTweetEngagement.tweetId - - event.sourceTweetDetails - .map { sourceTweetDetails => - val sourceTweetId = sourceTweetDetails.tweet.id // Id of the tweet being Retweeted - val sourceTweetEntitiesMapFut = userTweetEntityEdgeBuilder.getEntitiesMapAndUpdateCache( - tweetId = sourceTweetId, - tweetDetails = Some(sourceTweetDetails) - ) - - sourceTweetEntitiesMapFut.map { sourceTweetEntitiesMap => - val edge = UserTweetEntityEdge( - sourceUser = userTweetEngagement.engageUserId, - targetTweet = sourceTweetId, - action = Action.Retweet, - metadata = Some(tweetId), // metadata is the tweetId - cardInfo = Some(sourceTweetDetails.cardInfo.toByte), - entitiesMap = sourceTweetEntitiesMap, - tweetDetails = Some(sourceTweetDetails) - ) - numRetweetEdgesCounter.incr() - Seq(edge) - } - }.getOrElse(Future.Nil) - } - - override def buildEdges(event: TweetCreateEventDetails): Future[Seq[UserTweetEntityEdge]] = { - val userTweetEngagement = event.userTweetEngagement - userTweetEngagement.action match { - case Action.Retweet => - buildRetweetEdge(event) - case _ => - Future.Nil - } - - } - - override def filterEdges( - event: TweetCreateEventDetails, - edges: Seq[UserTweetEntityEdge] - ): Future[Seq[UserTweetEntityEdge]] = { - Future(edges) // No filtering for now. Add more if needed - } -} diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/TweetEventToUserUserGraphBuilder.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/TweetEventToUserUserGraphBuilder.scala deleted file mode 100644 index 3eec7f55c..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/TweetEventToUserUserGraphBuilder.scala +++ /dev/null @@ -1,65 +0,0 @@ -package com.twitter.recosinjector.edges - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.recos.util.Action -import com.twitter.recosinjector.util.TweetCreateEventDetails -import com.twitter.util.Future - -/** - * Given a tweet creation event, parse for UserUserGraph edges. Specifically, when a new tweet is - * created, extract the valid mentioned and mediatagged users in the tweet and create edges for them - */ -class TweetEventToUserUserGraphBuilder( -)( - override implicit val statsReceiver: StatsReceiver) - extends EventToMessageBuilder[TweetCreateEventDetails, UserUserEdge] { - private val tweetOrQuoteEventCounter = statsReceiver.counter("num_tweet_or_quote_event") - private val nonTweetOrQuoteEventCounter = statsReceiver.counter("num_non_tweet_or_quote_event") - private val mentionEdgeCounter = statsReceiver.counter("num_mention_edge") - private val mediatagEdgeCounter = statsReceiver.counter("num_mediatag_edge") - - override def shouldProcessEvent(event: TweetCreateEventDetails): Future[Boolean] = { - // For user interactions, only new tweets and quotes are considered (no replies or retweets) - event.userTweetEngagement.action match { - case Action.Tweet | Action.Quote => - tweetOrQuoteEventCounter.incr() - Future(true) - case _ => - nonTweetOrQuoteEventCounter.incr() - Future(false) - } - } - - override def buildEdges(event: TweetCreateEventDetails): Future[Seq[UserUserEdge]] = { - val mentionEdges = event.validMentionUserIds - .map(_.map { mentionUserId => - UserUserEdge( - sourceUser = event.userTweetEngagement.engageUserId, - targetUser = mentionUserId, - action = Action.Mention, - metadata = Some(System.currentTimeMillis()) - ) - }).getOrElse(Nil) - - val mediatagEdges = event.validMediatagUserIds - .map(_.map { mediatagUserId => - UserUserEdge( - sourceUser = event.userTweetEngagement.engageUserId, - targetUser = mediatagUserId, - action = Action.MediaTag, - metadata = Some(System.currentTimeMillis()) - ) - }).getOrElse(Nil) - - mentionEdgeCounter.incr(mentionEdges.size) - mediatagEdgeCounter.incr(mediatagEdges.size) - Future(mentionEdges ++ mediatagEdges) - } - - override def filterEdges( - event: TweetCreateEventDetails, - edges: Seq[UserUserEdge] - ): Future[Seq[UserUserEdge]] = { - Future(edges) - } -} diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/UnifiedUserActionToUserAdGraphBuilder.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/UnifiedUserActionToUserAdGraphBuilder.scala deleted file mode 100644 index e562ff2d1..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/UnifiedUserActionToUserAdGraphBuilder.scala +++ /dev/null @@ -1,44 +0,0 @@ -package com.twitter.recosinjector.edges - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.recos.util.Action -import com.twitter.recosinjector.util.UuaEngagementEventDetails -import com.twitter.util.Future - -class UnifiedUserActionToUserAdGraphBuilder( - userTweetEntityEdgeBuilder: UserTweetEntityEdgeBuilder -)( - override implicit val statsReceiver: StatsReceiver) - extends EventToMessageBuilder[UuaEngagementEventDetails, UserTweetEntityEdge] { - - override def shouldProcessEvent(event: UuaEngagementEventDetails): Future[Boolean] = { - event.userTweetEngagement.action match { - case Action.Click | Action.VideoPlayback75 | Action.Favorite => Future(true) - case _ => Future(false) - } - } - - override def buildEdges(details: UuaEngagementEventDetails): Future[Seq[UserTweetEntityEdge]] = { - val engagement = details.userTweetEngagement - val tweetDetails = engagement.tweetDetails - - Future.value( - Seq( - UserTweetEntityEdge( - sourceUser = engagement.engageUserId, - targetTweet = engagement.tweetId, - action = engagement.action, - metadata = engagement.engagementTimeMillis, - cardInfo = engagement.tweetDetails.map(_.cardInfo.toByte), - entitiesMap = None, - tweetDetails = tweetDetails - ))) - } - - override def filterEdges( - event: UuaEngagementEventDetails, - edges: Seq[UserTweetEntityEdge] - ): Future[Seq[UserTweetEntityEdge]] = { - Future(edges) - } -} diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/UnifiedUserActionToUserTweetGraphPlusBuilder.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/UnifiedUserActionToUserTweetGraphPlusBuilder.scala deleted file mode 100644 index 8f5ccb2b2..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/UnifiedUserActionToUserTweetGraphPlusBuilder.scala +++ /dev/null @@ -1,51 +0,0 @@ -package com.twitter.recosinjector.edges - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.recos.util.Action -import com.twitter.recosinjector.util.UuaEngagementEventDetails -import com.twitter.util.Future - -class UnifiedUserActionToUserTweetGraphPlusBuilder( - userTweetEntityEdgeBuilder: UserTweetEntityEdgeBuilder -)( - override implicit val statsReceiver: StatsReceiver) - extends EventToMessageBuilder[UuaEngagementEventDetails, UserTweetEntityEdge] { - - override def shouldProcessEvent(event: UuaEngagementEventDetails): Future[Boolean] = { - event.userTweetEngagement.action match { - case Action.Click | Action.VideoQualityView => Future(true) - case Action.Favorite | Action.Retweet | Action.Share => Future(true) - case Action.NotificationOpen | Action.EmailClick => Future(true) - case Action.Quote | Action.Reply => Future(true) - case Action.TweetNotInterestedIn | Action.TweetNotRelevant | Action.TweetSeeFewer | - Action.TweetReport | Action.TweetMuteAuthor | Action.TweetBlockAuthor => - Future(true) - case _ => Future(false) - } - } - - override def buildEdges(details: UuaEngagementEventDetails): Future[Seq[UserTweetEntityEdge]] = { - val engagement = details.userTweetEngagement - val tweetDetails = engagement.tweetDetails - - Future - .value( - UserTweetEntityEdge( - sourceUser = engagement.engageUserId, - targetTweet = engagement.tweetId, - action = engagement.action, - metadata = engagement.engagementTimeMillis, - cardInfo = engagement.tweetDetails.map(_.cardInfo.toByte), - entitiesMap = None, - tweetDetails = tweetDetails - ) - ).map(Seq(_)) - } - - override def filterEdges( - event: UuaEngagementEventDetails, - edges: Seq[UserTweetEntityEdge] - ): Future[Seq[UserTweetEntityEdge]] = { - Future(edges) - } -} diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/UnifiedUserActionToUserVideoGraphBuilder.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/UnifiedUserActionToUserVideoGraphBuilder.scala deleted file mode 100644 index 496c05dbd..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/UnifiedUserActionToUserVideoGraphBuilder.scala +++ /dev/null @@ -1,56 +0,0 @@ -package com.twitter.recosinjector.edges - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.recos.util.Action -import com.twitter.recosinjector.util.UuaEngagementEventDetails -import com.twitter.util.Future - -class UnifiedUserActionToUserVideoGraphBuilder( - userTweetEntityEdgeBuilder: UserTweetEntityEdgeBuilder -)( - override implicit val statsReceiver: StatsReceiver) - extends EventToMessageBuilder[UuaEngagementEventDetails, UserTweetEntityEdge] { - - private val numVideoPlayback50EdgeCounter = statsReceiver.counter("num_video_playback50_edge") - private val numUnVideoPlayback50Counter = statsReceiver.counter("num_non_video_playback50_edge") - - override def shouldProcessEvent(event: UuaEngagementEventDetails): Future[Boolean] = { - event.userTweetEngagement.action match { - case Action.VideoPlayback50 => Future(true) - case _ => Future(false) - } - } - - override def buildEdges(details: UuaEngagementEventDetails): Future[Seq[UserTweetEntityEdge]] = { - val engagement = details.userTweetEngagement - val tweetDetails = engagement.tweetDetails - - Future - .value( - UserTweetEntityEdge( - sourceUser = engagement.engageUserId, - targetTweet = engagement.tweetId, - action = engagement.action, - metadata = engagement.engagementTimeMillis, - cardInfo = engagement.tweetDetails.map(_.cardInfo.toByte), - entitiesMap = None, - tweetDetails = tweetDetails - ) - ).map { edge => - edge match { - case videoPlayback50 if videoPlayback50.action == Action.VideoPlayback50 => - numVideoPlayback50EdgeCounter.incr() - case _ => - numUnVideoPlayback50Counter.incr() - } - Seq(edge) - } - } - - override def filterEdges( - event: UuaEngagementEventDetails, - edges: Seq[UserTweetEntityEdge] - ): Future[Seq[UserTweetEntityEdge]] = { - Future(edges) - } -} diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/UserTweetEntityEdgeBuilder.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/UserTweetEntityEdgeBuilder.scala deleted file mode 100644 index 283378b03..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/edges/UserTweetEntityEdgeBuilder.scala +++ /dev/null @@ -1,80 +0,0 @@ -package com.twitter.recosinjector.edges - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.graphjet.algorithms.RecommendationType -import com.twitter.recosinjector.clients.CacheEntityEntry -import com.twitter.recosinjector.clients.RecosHoseEntitiesCache -import com.twitter.recosinjector.clients.UrlResolver -import com.twitter.recosinjector.util.TweetDetails -import com.twitter.util.Future -import scala.collection.Map -import scala.util.hashing.MurmurHash3 - -class UserTweetEntityEdgeBuilder( - cache: RecosHoseEntitiesCache, - urlResolver: UrlResolver -)( - implicit val stats: StatsReceiver) { - - def getHashedEntities(entities: Seq[String]): Seq[Int] = { - entities.map(MurmurHash3.stringHash) - } - - /** - * Given the entities and their corresponding hashedIds, store the hashId->entity mapping into a - * cache. - * This is because UTEG edges only store the hashIds, and relies on the cache values to - * recover the actual entities. This allows us to store integer values instead of string in the - * edges to save space. - */ - private def storeEntitiesInCache( - urlEntities: Seq[String], - urlHashIds: Seq[Int] - ): Future[Unit] = { - val urlCacheEntries = urlHashIds.zip(urlEntities).map { - case (hashId, url) => - CacheEntityEntry(RecosHoseEntitiesCache.UrlPrefix, hashId, url) - } - cache.updateEntitiesCache( - newCacheEntries = urlCacheEntries, - stats = stats.scope("urlCache") - ) - } - - /** - * Return an entity mapping from GraphJet recType -> hash(entity) - */ - private def getEntitiesMap( - urlHashIds: Seq[Int] - ) = { - val entitiesMap = Seq( - RecommendationType.URL.getValue.toByte -> urlHashIds - ).collect { - case (keys, ids) if ids.nonEmpty => keys -> ids - }.toMap - if (entitiesMap.isEmpty) None else Some(entitiesMap) - } - - def getEntitiesMapAndUpdateCache( - tweetId: Long, - tweetDetails: Option[TweetDetails] - ): Future[Option[Map[Byte, Seq[Int]]]] = { - val resolvedUrlFut = urlResolver - .getResolvedUrls( - urls = tweetDetails.flatMap(_.urls).getOrElse(Nil), - tweetId = tweetId - ).map(_.values.toSeq) - - resolvedUrlFut.map { resolvedUrls => - val urlEntities = resolvedUrls - val urlHashIds = getHashedEntities(urlEntities) - - // Async call to cache - storeEntitiesInCache( - urlEntities = urlEntities, - urlHashIds = urlHashIds - ) - getEntitiesMap(urlHashIds) - } - } -} diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/event_processors/BUILD b/recos-injector/server/src/main/scala/com/twitter/recosinjector/event_processors/BUILD deleted file mode 100644 index e7f3cdf50..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/event_processors/BUILD +++ /dev/null @@ -1,20 +0,0 @@ -scala_library( - platform = "java11", - strict_deps = False, - tags = ["bazel-compatible"], - dependencies = [ - "eventbus/client", - "recos-injector/server/config", - "recos-injector/server/src/main/scala/com/twitter/recosinjector/clients", - "recos-injector/server/src/main/scala/com/twitter/recosinjector/config", - "recos-injector/server/src/main/scala/com/twitter/recosinjector/decider", - "recos-injector/server/src/main/scala/com/twitter/recosinjector/edges", - "recos-injector/server/src/main/scala/com/twitter/recosinjector/publishers", - "src/thrift/com/twitter/clientapp/gen:clientapp-scala", - "src/thrift/com/twitter/gizmoduck:user-thrift-scala", - "src/thrift/com/twitter/socialgraph:thrift-scala", - "src/thrift/com/twitter/timelineservice/server/internal:thrift-scala", - "src/thrift/com/twitter/tweetypie:events-scala", - "src/thrift/com/twitter/tweetypie:tweet-scala", - ], -) diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/event_processors/EventBusProcessor.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/event_processors/EventBusProcessor.scala deleted file mode 100644 index 9597b0bb8..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/event_processors/EventBusProcessor.scala +++ /dev/null @@ -1,60 +0,0 @@ -package com.twitter.recosinjector.event_processors - -import com.twitter.eventbus.client.{EventBusSubscriber, EventBusSubscriberBuilder} -import com.twitter.finagle.mtls.authentication.ServiceIdentifier -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.logging.Logger -import com.twitter.scrooge.{ThriftStruct, ThriftStructCodec} -import com.twitter.util.Future - -/** - * Main processor class that handles incoming EventBus events, which take forms of a ThriftStruct. - * This class is responsible for setting up the EventBus streams, and provides a processEvent() - * where child classes can decide what to do with incoming events - */ -trait EventBusProcessor[Event <: ThriftStruct] { - private val log = Logger() - - implicit def statsReceiver: StatsReceiver - - /** - * Full name of the EventBus stream this processor listens to - */ - val eventBusStreamName: String - - /** - * the thriftStruct definition of the objects passed in from the EventBus streams, such as - * TweetEvent, WriteEvent, etc. - */ - val thriftStruct: ThriftStructCodec[Event] - - val serviceIdentifier: ServiceIdentifier - - def processEvent(event: Event): Future[Unit] - - private def getEventBusSubscriberBuilder: EventBusSubscriberBuilder[Event] = - EventBusSubscriberBuilder() - .subscriberId(eventBusStreamName) - .serviceIdentifier(serviceIdentifier) - .thriftStruct(thriftStruct) - .numThreads(8) - .fromAllZones(true) // Receives traffic from all data centers - .skipToLatest(false) // Ensures we don't miss out on events during restart - .statsReceiver(statsReceiver) - - // lazy val ensures the subscriber is only initialized when start() is called - private lazy val eventBusSubscriber = getEventBusSubscriberBuilder.build(processEvent) - - def start(): EventBusSubscriber[Event] = eventBusSubscriber - - def stop(): Unit = { - eventBusSubscriber - .close() - .onSuccess { _ => - log.info(s"EventBus processor ${this.getClass.getSimpleName} is stopped") - } - .onFailure { ex: Throwable => - log.error(ex, s"Exception while stopping EventBus processor ${this.getClass.getSimpleName}") - } - } -} diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/event_processors/SocialWriteEventProcessor.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/event_processors/SocialWriteEventProcessor.scala deleted file mode 100644 index 550199f00..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/event_processors/SocialWriteEventProcessor.scala +++ /dev/null @@ -1,33 +0,0 @@ -package com.twitter.recosinjector.event_processors - -import com.twitter.finagle.mtls.authentication.ServiceIdentifier -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.recosinjector.edges.{EventToMessageBuilder, UserUserEdge} -import com.twitter.recosinjector.publishers.KafkaEventPublisher -import com.twitter.scrooge.ThriftStructCodec -import com.twitter.socialgraph.thriftscala.WriteEvent -import com.twitter.util.Future - -/** - * This processor listens to events from social graphs services. In particular, a major use case is - * to listen to user-user follow events. - */ -class SocialWriteEventProcessor( - override val eventBusStreamName: String, - override val thriftStruct: ThriftStructCodec[WriteEvent], - override val serviceIdentifier: ServiceIdentifier, - kafkaEventPublisher: KafkaEventPublisher, - userUserGraphTopic: String, - userUserGraphMessageBuilder: EventToMessageBuilder[WriteEvent, UserUserEdge] -)( - override implicit val statsReceiver: StatsReceiver) - extends EventBusProcessor[WriteEvent] { - - override def processEvent(event: WriteEvent): Future[Unit] = { - userUserGraphMessageBuilder.processEvent(event).map { edges => - edges.foreach { edge => - kafkaEventPublisher.publish(edge.convertToRecosHoseMessage, userUserGraphTopic) - } - } - } -} diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/event_processors/TimelineEventProcessor.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/event_processors/TimelineEventProcessor.scala deleted file mode 100644 index 372da8e6f..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/event_processors/TimelineEventProcessor.scala +++ /dev/null @@ -1,150 +0,0 @@ -package com.twitter.recosinjector.event_processors - -import com.twitter.finagle.mtls.authentication.ServiceIdentifier -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.recos.util.Action -import com.twitter.recosinjector.clients.Gizmoduck -import com.twitter.recosinjector.clients.Tweetypie -import com.twitter.recosinjector.decider.RecosInjectorDecider -import com.twitter.recosinjector.decider.RecosInjectorDeciderConstants -import com.twitter.recosinjector.edges.TimelineEventToUserTweetEntityGraphBuilder -import com.twitter.recosinjector.filters.TweetFilter -import com.twitter.recosinjector.filters.UserFilter -import com.twitter.recosinjector.publishers.KafkaEventPublisher -import com.twitter.recosinjector.util.TweetDetails -import com.twitter.recosinjector.util.TweetFavoriteEventDetails -import com.twitter.recosinjector.util.UserTweetEngagement -import com.twitter.scrooge.ThriftStructCodec -import com.twitter.timelineservice.thriftscala.FavoriteEvent -import com.twitter.timelineservice.thriftscala.UnfavoriteEvent -import com.twitter.timelineservice.thriftscala.{Event => TimelineEvent} -import com.twitter.util.Future - -/** - * Processor for Timeline events, such as Favorite (liking) tweets - */ -class TimelineEventProcessor( - override val eventBusStreamName: String, - override val thriftStruct: ThriftStructCodec[TimelineEvent], - override val serviceIdentifier: ServiceIdentifier, - kafkaEventPublisher: KafkaEventPublisher, - userTweetEntityGraphTopic: String, - userTweetEntityGraphMessageBuilder: TimelineEventToUserTweetEntityGraphBuilder, - decider: RecosInjectorDecider, - gizmoduck: Gizmoduck, - tweetypie: Tweetypie -)( - override implicit val statsReceiver: StatsReceiver) - extends EventBusProcessor[TimelineEvent] { - - private val processEventDeciderCounter = statsReceiver.counter("num_process_timeline_event") - private val numFavoriteEventCounter = statsReceiver.counter("num_favorite_event") - private val numUnFavoriteEventCounter = statsReceiver.counter("num_unfavorite_event") - private val numNotFavoriteEventCounter = statsReceiver.counter("num_not_favorite_event") - - private val numSelfFavoriteCounter = statsReceiver.counter("num_self_favorite_event") - private val numNullCastTweetCounter = statsReceiver.counter("num_null_cast_tweet") - private val numTweetFailSafetyLevelCounter = statsReceiver.counter("num_fail_tweetypie_safety") - private val numFavoriteUserUnsafeCounter = statsReceiver.counter("num_favorite_user_unsafe") - private val engageUserFilter = new UserFilter(gizmoduck)(statsReceiver.scope("engage_user")) - private val tweetFilter = new TweetFilter(tweetypie) - - private val numProcessFavorite = statsReceiver.counter("num_process_favorite") - private val numNoProcessFavorite = statsReceiver.counter("num_no_process_favorite") - - private def getFavoriteEventDetails( - favoriteEvent: FavoriteEvent - ): TweetFavoriteEventDetails = { - - val engagement = UserTweetEngagement( - engageUserId = favoriteEvent.userId, - engageUser = favoriteEvent.user, - action = Action.Favorite, - engagementTimeMillis = Some(favoriteEvent.eventTimeMs), - tweetId = favoriteEvent.tweetId, // the tweet, or source tweet if target tweet is a retweet - tweetDetails = favoriteEvent.tweet.map(TweetDetails) // tweet always exists - ) - TweetFavoriteEventDetails(userTweetEngagement = engagement) - } - - private def getUnfavoriteEventDetails( - unfavoriteEvent: UnfavoriteEvent - ): TweetFavoriteEventDetails = { - val engagement = UserTweetEngagement( - engageUserId = unfavoriteEvent.userId, - engageUser = unfavoriteEvent.user, - action = Action.Unfavorite, - engagementTimeMillis = Some(unfavoriteEvent.eventTimeMs), - tweetId = unfavoriteEvent.tweetId, // the tweet, or source tweet if target tweet is a retweet - tweetDetails = unfavoriteEvent.tweet.map(TweetDetails) // tweet always exists - ) - TweetFavoriteEventDetails(userTweetEngagement = engagement) - } - - private def shouldProcessFavoriteEvent(event: TweetFavoriteEventDetails): Future[Boolean] = { - val engagement = event.userTweetEngagement - val engageUserId = engagement.engageUserId - val tweetId = engagement.tweetId - val authorIdOpt = engagement.tweetDetails.flatMap(_.authorId) - - val isSelfFavorite = authorIdOpt.contains(engageUserId) - val isNullCastTweet = engagement.tweetDetails.forall(_.isNullCastTweet) - val isEngageUserSafeFut = engageUserFilter.filterByUserId(engageUserId) - val isTweetPassSafetyFut = tweetFilter.filterForTweetypieSafetyLevel(tweetId) - - Future.join(isEngageUserSafeFut, isTweetPassSafetyFut).map { - case (isEngageUserSafe, isTweetPassSafety) => - if (isSelfFavorite) numSelfFavoriteCounter.incr() - if (isNullCastTweet) numNullCastTweetCounter.incr() - if (!isEngageUserSafe) numFavoriteUserUnsafeCounter.incr() - if (!isTweetPassSafety) numTweetFailSafetyLevelCounter.incr() - - !isSelfFavorite && !isNullCastTweet && isEngageUserSafe && isTweetPassSafety - } - } - - private def processFavoriteEvent(favoriteEvent: FavoriteEvent): Future[Unit] = { - val eventDetails = getFavoriteEventDetails(favoriteEvent) - shouldProcessFavoriteEvent(eventDetails).map { - case true => - numProcessFavorite.incr() - // Convert the event for UserTweetEntityGraph - userTweetEntityGraphMessageBuilder.processEvent(eventDetails).map { edges => - edges.foreach { edge => - kafkaEventPublisher.publish(edge.convertToRecosHoseMessage, userTweetEntityGraphTopic) - } - } - case false => - numNoProcessFavorite.incr() - } - } - - private def processUnFavoriteEvent(unFavoriteEvent: UnfavoriteEvent): Future[Unit] = { - if (decider.isAvailable(RecosInjectorDeciderConstants.EnableUnfavoriteEdge)) { - val eventDetails = getUnfavoriteEventDetails(unFavoriteEvent) - // Convert the event for UserTweetEntityGraph - userTweetEntityGraphMessageBuilder.processEvent(eventDetails).map { edges => - edges.foreach { edge => - kafkaEventPublisher.publish(edge.convertToRecosHoseMessage, userTweetEntityGraphTopic) - } - } - } else { - Future.Unit - } - } - - override def processEvent(event: TimelineEvent): Future[Unit] = { - processEventDeciderCounter.incr() - event match { - case TimelineEvent.Favorite(favoriteEvent: FavoriteEvent) => - numFavoriteEventCounter.incr() - processFavoriteEvent(favoriteEvent) - case TimelineEvent.Unfavorite(unFavoriteEvent: UnfavoriteEvent) => - numUnFavoriteEventCounter.incr() - processUnFavoriteEvent(unFavoriteEvent) - case _ => - numNotFavoriteEventCounter.incr() - Future.Unit - } - } -} diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/event_processors/TweetEventProcessor.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/event_processors/TweetEventProcessor.scala deleted file mode 100644 index 0d9d16fa5..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/event_processors/TweetEventProcessor.scala +++ /dev/null @@ -1,256 +0,0 @@ -package com.twitter.recosinjector.event_processors - -import com.twitter.finagle.mtls.authentication.ServiceIdentifier -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.frigate.common.util.SnowflakeUtils -import com.twitter.gizmoduck.thriftscala.User -import com.twitter.recos.util.Action -import com.twitter.recos.util.Action.Action -import com.twitter.recosinjector.clients.Gizmoduck -import com.twitter.recosinjector.clients.SocialGraph -import com.twitter.recosinjector.clients.Tweetypie -import com.twitter.recosinjector.edges.TweetEventToUserTweetEntityGraphBuilder -import com.twitter.recosinjector.edges.TweetEventToUserUserGraphBuilder -import com.twitter.recosinjector.filters.TweetFilter -import com.twitter.recosinjector.filters.UserFilter -import com.twitter.recosinjector.publishers.KafkaEventPublisher -import com.twitter.recosinjector.util.TweetCreateEventDetails -import com.twitter.recosinjector.util.TweetDetails -import com.twitter.recosinjector.util.UserTweetEngagement -import com.twitter.scrooge.ThriftStructCodec -import com.twitter.tweetypie.thriftscala.Tweet -import com.twitter.tweetypie.thriftscala.TweetCreateEvent -import com.twitter.tweetypie.thriftscala.TweetEvent -import com.twitter.tweetypie.thriftscala.TweetEventData -import com.twitter.util.Future - -/** - * Event processor for tweet_events EventBus stream from Tweetypie. This stream provides all the - * key events related to a new tweet, like Creation, Retweet, Quote Tweet, and Replying. - * It also carries the entities/metadata information in a tweet, including - * @ Mention, HashTag, MediaTag, URL, etc. - */ -class TweetEventProcessor( - override val eventBusStreamName: String, - override val thriftStruct: ThriftStructCodec[TweetEvent], - override val serviceIdentifier: ServiceIdentifier, - userUserGraphMessageBuilder: TweetEventToUserUserGraphBuilder, - userUserGraphTopic: String, - userTweetEntityGraphMessageBuilder: TweetEventToUserTweetEntityGraphBuilder, - userTweetEntityGraphTopic: String, - kafkaEventPublisher: KafkaEventPublisher, - socialGraph: SocialGraph, - gizmoduck: Gizmoduck, - tweetypie: Tweetypie -)( - override implicit val statsReceiver: StatsReceiver) - extends EventBusProcessor[TweetEvent] { - - private val tweetCreateEventCounter = statsReceiver.counter("num_tweet_create_events") - private val nonTweetCreateEventCounter = statsReceiver.counter("num_non_tweet_create_events") - - private val tweetActionStats = statsReceiver.scope("tweet_action") - private val numUrlCounter = statsReceiver.counter("num_tweet_url") - private val numMediaUrlCounter = statsReceiver.counter("num_tweet_media_url") - private val numHashTagCounter = statsReceiver.counter("num_tweet_hashtag") - - private val numMentionsCounter = statsReceiver.counter("num_tweet_mention") - private val numMediatagCounter = statsReceiver.counter("num_tweet_mediatag") - private val numValidMentionsCounter = statsReceiver.counter("num_tweet_valid_mention") - private val numValidMediatagCounter = statsReceiver.counter("num_tweet_valid_mediatag") - - private val numNullCastTweetCounter = statsReceiver.counter("num_null_cast_tweet") - private val numNullCastSourceTweetCounter = statsReceiver.counter("num_null_cast_source_tweet") - private val numTweetFailSafetyLevelCounter = statsReceiver.counter("num_fail_tweetypie_safety") - private val numAuthorUnsafeCounter = statsReceiver.counter("num_author_unsafe") - private val numProcessTweetCounter = statsReceiver.counter("num_process_tweet") - private val numNoProcessTweetCounter = statsReceiver.counter("num_no_process_tweet") - - private val selfRetweetCounter = statsReceiver.counter("num_retweets_self") - - private val engageUserFilter = new UserFilter(gizmoduck)(statsReceiver.scope("author_user")) - private val tweetFilter = new TweetFilter(tweetypie) - - private def trackTweetCreateEventStats(details: TweetCreateEventDetails): Unit = { - tweetActionStats.counter(details.userTweetEngagement.action.toString).incr() - - details.userTweetEngagement.tweetDetails.foreach { tweetDetails => - tweetDetails.mentionUserIds.foreach(mention => numMentionsCounter.incr(mention.size)) - tweetDetails.mediatagUserIds.foreach(mediatag => numMediatagCounter.incr(mediatag.size)) - tweetDetails.urls.foreach(urls => numUrlCounter.incr(urls.size)) - tweetDetails.mediaUrls.foreach(mediaUrls => numMediaUrlCounter.incr(mediaUrls.size)) - tweetDetails.hashtags.foreach(hashtags => numHashTagCounter.incr(hashtags.size)) - } - - details.validMentionUserIds.foreach(mentions => numValidMentionsCounter.incr(mentions.size)) - details.validMediatagUserIds.foreach(mediatags => numValidMediatagCounter.incr(mediatags.size)) - } - - /** - * Given a created tweet, return what type of tweet it is, i.e. Tweet, Retweet, Quote, or Reply。 - * Retweet, Quote, or Reply are responsive actions to a source tweet, so for these tweets, - * we also return the tweet id and author of the source tweet (ex. the tweet being retweeted). - */ - private def getTweetAction(tweetDetails: TweetDetails): Action = { - (tweetDetails.replySourceId, tweetDetails.retweetSourceId, tweetDetails.quoteSourceId) match { - case (Some(_), _, _) => - Action.Reply - case (_, Some(_), _) => - Action.Retweet - case (_, _, Some(_)) => - Action.Quote - case _ => - Action.Tweet - } - } - - /** - * Given a list of mentioned users and mediatagged users in the tweet, return the users who - * actually follow the source user. - */ - private def getFollowedByIds( - sourceUserId: Long, - mentionUserIds: Option[Seq[Long]], - mediatagUserIds: Option[Seq[Long]] - ): Future[Seq[Long]] = { - val uniqueEntityUserIds = - (mentionUserIds.getOrElse(Nil) ++ mediatagUserIds.getOrElse(Nil)).distinct - if (uniqueEntityUserIds.isEmpty) { - Future.Nil - } else { - socialGraph.followedByNotMutedBy(sourceUserId, uniqueEntityUserIds) - } - } - - private def getSourceTweet(tweetDetails: TweetDetails): Future[Option[Tweet]] = { - tweetDetails.sourceTweetId match { - case Some(sourceTweetId) => - tweetypie.getTweet(sourceTweetId) - case _ => - Future.None - } - } - - /** - * Extract and return the details when the source user created a new tweet. - */ - private def getTweetDetails( - tweet: Tweet, - engageUser: User - ): Future[TweetCreateEventDetails] = { - val tweetDetails = TweetDetails(tweet) - - val action = getTweetAction(tweetDetails) - val tweetCreationTimeMillis = SnowflakeUtils.tweetCreationTime(tweet.id).map(_.inMilliseconds) - val engageUserId = engageUser.id - val userTweetEngagement = UserTweetEngagement( - engageUserId = engageUserId, - engageUser = Some(engageUser), - action = action, - engagementTimeMillis = tweetCreationTimeMillis, - tweetId = tweet.id, - tweetDetails = Some(tweetDetails) - ) - - val sourceTweetFut = getSourceTweet(tweetDetails) - val followedByIdsFut = getFollowedByIds( - engageUserId, - tweetDetails.mentionUserIds, - tweetDetails.mediatagUserIds - ) - - Future.join(followedByIdsFut, sourceTweetFut).map { - case (followedByIds, sourceTweet) => - TweetCreateEventDetails( - userTweetEngagement = userTweetEngagement, - validEntityUserIds = followedByIds, - sourceTweetDetails = sourceTweet.map(TweetDetails) - ) - } - } - - /** - * Exclude any Retweets of one's own tweets - */ - private def isEventSelfRetweet(tweetEvent: TweetCreateEventDetails): Boolean = { - (tweetEvent.userTweetEngagement.action == Action.Retweet) && - tweetEvent.userTweetEngagement.tweetDetails.exists( - _.sourceTweetUserId.contains( - tweetEvent.userTweetEngagement.engageUserId - )) - } - - private def isTweetPassSafetyFilter(tweetEvent: TweetCreateEventDetails): Future[Boolean] = { - tweetEvent.userTweetEngagement.action match { - case Action.Reply | Action.Retweet | Action.Quote => - tweetEvent.userTweetEngagement.tweetDetails - .flatMap(_.sourceTweetId).map { sourceTweetId => - tweetFilter.filterForTweetypieSafetyLevel(sourceTweetId) - }.getOrElse(Future(false)) - case Action.Tweet => - tweetFilter.filterForTweetypieSafetyLevel(tweetEvent.userTweetEngagement.tweetId) - } - } - - private def shouldProcessTweetEvent(event: TweetCreateEventDetails): Future[Boolean] = { - val engagement = event.userTweetEngagement - val engageUserId = engagement.engageUserId - - val isNullCastTweet = engagement.tweetDetails.forall(_.isNullCastTweet) - val isNullCastSourceTweet = event.sourceTweetDetails.exists(_.isNullCastTweet) - val isSelfRetweet = isEventSelfRetweet(event) - val isEngageUserSafeFut = engageUserFilter.filterByUserId(engageUserId) - val isTweetPassSafetyFut = isTweetPassSafetyFilter(event) - - Future.join(isEngageUserSafeFut, isTweetPassSafetyFut).map { - case (isEngageUserSafe, isTweetPassSafety) => - if (isNullCastTweet) numNullCastTweetCounter.incr() - if (isNullCastSourceTweet) numNullCastSourceTweetCounter.incr() - if (!isEngageUserSafe) numAuthorUnsafeCounter.incr() - if (isSelfRetweet) selfRetweetCounter.incr() - if (!isTweetPassSafety) numTweetFailSafetyLevelCounter.incr() - - !isNullCastTweet && - !isNullCastSourceTweet && - !isSelfRetweet && - isEngageUserSafe && - isTweetPassSafety - } - } - - override def processEvent(event: TweetEvent): Future[Unit] = { - event.data match { - case TweetEventData.TweetCreateEvent(event: TweetCreateEvent) => - getTweetDetails( - tweet = event.tweet, - engageUser = event.user - ).flatMap { eventWithDetails => - tweetCreateEventCounter.incr() - - shouldProcessTweetEvent(eventWithDetails).map { - case true => - numProcessTweetCounter.incr() - trackTweetCreateEventStats(eventWithDetails) - // Convert the event for UserUserGraph - userUserGraphMessageBuilder.processEvent(eventWithDetails).map { edges => - edges.foreach { edge => - kafkaEventPublisher.publish(edge.convertToRecosHoseMessage, userUserGraphTopic) - } - } - // Convert the event for UserTweetEntityGraph - userTweetEntityGraphMessageBuilder.processEvent(eventWithDetails).map { edges => - edges.foreach { edge => - kafkaEventPublisher - .publish(edge.convertToRecosHoseMessage, userTweetEntityGraphTopic) - } - } - case false => - numNoProcessTweetCounter.incr() - } - } - case _ => - nonTweetCreateEventCounter.incr() - Future.Unit - } - } -} diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/filters/BUILD b/recos-injector/server/src/main/scala/com/twitter/recosinjector/filters/BUILD deleted file mode 100644 index 9963bbe44..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/filters/BUILD +++ /dev/null @@ -1,10 +0,0 @@ -scala_library( - platform = "java11", - strict_deps = False, - tags = ["bazel-compatible"], - dependencies = [ - "finagle/finagle-stats", - "recos-injector/server/src/main/scala/com/twitter/recosinjector/clients", - "src/thrift/com/twitter/gizmoduck:thrift-scala", - ], -) diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/filters/NullCastTweetFilter.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/filters/NullCastTweetFilter.scala deleted file mode 100644 index 9a19efa75..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/filters/NullCastTweetFilter.scala +++ /dev/null @@ -1,34 +0,0 @@ -package com.twitter.recosinjector.filters - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.recosinjector.clients.Tweetypie -import com.twitter.util.Future - -/** - * Filters tweets that are null cast, i.e. tweet is not delivered to a user's followers, - * not shown in the user's timeline, and does not appear in search results. - * They are mainly ads tweets. - */ -class NullCastTweetFilter( - tweetypie: Tweetypie -)( - implicit statsReceiver: StatsReceiver) { - private val stats = statsReceiver.scope(this.getClass.getSimpleName) - private val requests = stats.counter("requests") - private val filtered = stats.counter("filtered") - - // Return Future(True) to keep the Tweet. - def filter(tweetId: Long): Future[Boolean] = { - requests.incr() - tweetypie - .getTweet(tweetId) - .map { tweetOpt => - // If the null cast bit is Some(true), drop the tweet. - val isNullCastTweet = tweetOpt.flatMap(_.coreData).exists(_.nullcast) - if (isNullCastTweet) { - filtered.incr() - } - !isNullCastTweet - } - } -} diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/filters/TweetFilter.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/filters/TweetFilter.scala deleted file mode 100644 index aecc21515..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/filters/TweetFilter.scala +++ /dev/null @@ -1,31 +0,0 @@ -package com.twitter.recosinjector.filters - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.recosinjector.clients.Tweetypie -import com.twitter.util.Future - -class TweetFilter( - tweetypie: Tweetypie -)( - implicit statsReceiver: StatsReceiver) { - private val stats = statsReceiver.scope(this.getClass.getSimpleName) - private val requests = stats.counter("requests") - private val filtered = stats.counter("filtered") - - /** - * Query Tweetypie to see if we can fetch a tweet object successfully. TweetyPie applies a safety - * filter and will not return the tweet object if the filter does not pass. - */ - def filterForTweetypieSafetyLevel(tweetId: Long): Future[Boolean] = { - requests.incr() - tweetypie - .getTweet(tweetId) - .map { - case Some(_) => - true - case _ => - filtered.incr() - false - } - } -} diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/filters/UserFilter.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/filters/UserFilter.scala deleted file mode 100644 index b2e51f647..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/filters/UserFilter.scala +++ /dev/null @@ -1,69 +0,0 @@ -package com.twitter.recosinjector.filters - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.gizmoduck.thriftscala.{LabelValue, User} -import com.twitter.recosinjector.clients.Gizmoduck -import com.twitter.util.Future - -class UserFilter( - gizmoduck: Gizmoduck -)( - implicit statsReceiver: StatsReceiver) { - private val stats = statsReceiver.scope(this.getClass.getSimpleName) - private val requests = stats.counter("requests") - private val filtered = stats.counter("filtered") - - private def isUnsafe(user: User): Boolean = - user.safety.exists { s => - s.deactivated || s.suspended || s.restricted || s.nsfwUser || s.nsfwAdmin || s.isProtected - } - - private def hasNsfwHighPrecisionLabel(user: User): Boolean = - user.labels.exists { - _.labels.exists(_.labelValue == LabelValue.NsfwHighPrecision) - } - - /** - * NOTE: This will by-pass Gizmoduck's safety level, and might allow invalid users to pass filter. - * Consider using filterByUserId instead. - * Return true if the user is valid, otherwise return false. - * It will first attempt to use the user object provided by the caller, and will call Gizmoduck - * to back fill if the caller does not provide it. This helps reduce Gizmoduck traffic. - */ - def filterByUser( - userId: Long, - userOpt: Option[User] = None - ): Future[Boolean] = { - requests.incr() - val userFut = userOpt match { - case Some(user) => Future(Some(user)) - case _ => gizmoduck.getUser(userId) - } - - userFut.map(_.exists { user => - val isValidUser = !isUnsafe(user) && !hasNsfwHighPrecisionLabel(user) - if (!isValidUser) filtered.incr() - isValidUser - }) - } - - /** - * Given a userId, return true if the user is valid. This id done in 2 steps: - * 1. Applying Gizmoduck's safety level while querying for the user from Gizmoduck - * 2. If a user passes Gizmoduck's safety level, check its specific user status - */ - def filterByUserId(userId: Long): Future[Boolean] = { - requests.incr() - gizmoduck - .getUser(userId) - .map { userOpt => - val isValidUser = userOpt.exists { user => - !(isUnsafe(user) || hasNsfwHighPrecisionLabel(user)) - } - if (!isValidUser) { - filtered.incr() - } - isValidUser - } - } -} diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/publishers/BUILD b/recos-injector/server/src/main/scala/com/twitter/recosinjector/publishers/BUILD deleted file mode 100644 index 20ea2ba3a..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/publishers/BUILD +++ /dev/null @@ -1,12 +0,0 @@ -scala_library( - platform = "java11", - strict_deps = False, - tags = ["bazel-compatible"], - dependencies = [ - "3rdparty/jvm/org/apache/thrift:libthrift", - "finatra-internal/messaging/kafka/src/main/scala", - "servo/repo/src/main/scala", - "src/thrift/com/twitter/recos:recos-injector-scala", - "src/thrift/com/twitter/recos:recos-internal-scala", - ], -) diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/publishers/KafkaEventPublisher.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/publishers/KafkaEventPublisher.scala deleted file mode 100644 index 1c1ad8207..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/publishers/KafkaEventPublisher.scala +++ /dev/null @@ -1,54 +0,0 @@ -package com.twitter.recosinjector.publishers - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.finagle.thrift.ClientId -import com.twitter.finatra.kafka.producers.FinagleKafkaProducerBuilder -import com.twitter.finatra.kafka.serde.ScalaSerdes -import com.twitter.recos.internal.thriftscala.RecosHoseMessage -import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.common.config.SaslConfigs -import org.apache.kafka.common.config.SslConfigs -import org.apache.kafka.common.security.auth.SecurityProtocol -import org.apache.kafka.common.serialization.StringSerializer - -case class KafkaEventPublisher( - kafkaDest: String, - outputKafkaTopicPrefix: String, - clientId: ClientId, - truststoreLocation: String) { - - private val producer = FinagleKafkaProducerBuilder[String, RecosHoseMessage]() - .dest(kafkaDest) - .clientId(clientId.name) - .keySerializer(new StringSerializer) - .valueSerializer(ScalaSerdes.Thrift[RecosHoseMessage].serializer) - .withConfig(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.toString) - .withConfig(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreLocation) - .withConfig(SaslConfigs.SASL_MECHANISM, SaslConfigs.GSSAPI_MECHANISM) - .withConfig(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka") - .withConfig(SaslConfigs.SASL_KERBEROS_SERVER_NAME, "kafka") - // Use Native Kafka Client - .buildClient() - - def publish( - message: RecosHoseMessage, - topic: String - )( - implicit statsReceiver: StatsReceiver - ): Unit = { - val topicName = s"${outputKafkaTopicPrefix}_$topic" - // Kafka Producer is thread-safe. No extra Future-pool protect. - producer.send(new ProducerRecord(topicName, message)) - statsReceiver.counter(topicName + "_written_msg_success").incr() - } -} - -object KafkaEventPublisher { - // Kafka topics available for publishing - val UserVideoTopic = "user_video" - val UserTweetEntityTopic = "user_tweet_entity" - val UserUserTopic = "user_user" - val UserAdTopic = "user_tweet" - val UserTweetPlusTopic = "user_tweet_plus" -} diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/util/BUILD b/recos-injector/server/src/main/scala/com/twitter/recosinjector/util/BUILD deleted file mode 100644 index f39ec2f4a..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/util/BUILD +++ /dev/null @@ -1,12 +0,0 @@ -scala_library( - platform = "java11", - strict_deps = False, - tags = ["bazel-compatible"], - dependencies = [ - "3rdparty/jvm/com/twitter/graphjet", - "finagle/finagle-stats", - "recos-injector/server/src/main/scala/com/twitter/recosinjector/clients", - "src/scala/com/twitter/recos/util:recos-util", - "src/thrift/com/twitter/tweetypie:tweet-scala", - ], -) diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/util/EventDetails.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/util/EventDetails.scala deleted file mode 100644 index 9df3e752b..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/util/EventDetails.scala +++ /dev/null @@ -1,126 +0,0 @@ -package com.twitter.recosinjector.util - -import com.twitter.frigate.common.base.TweetUtil -import com.twitter.gizmoduck.thriftscala.User -import com.twitter.recos.util.Action.Action -import com.twitter.tweetypie.thriftscala.Tweet - -/** - * This is used to store information about a newly created tweet - * @param validEntityUserIds For users mentioned or mediatagged in the tweet, these follow the - * engage user and only they are are considered valid - * @param sourceTweetDetails For Reply, Quote, or RT, source tweet is the tweet being actioned on - */ -case class TweetCreateEventDetails( - userTweetEngagement: UserTweetEngagement, - validEntityUserIds: Seq[Long], - sourceTweetDetails: Option[TweetDetails]) { - // A mention is only valid if the mentioned user follows the source user - val validMentionUserIds: Option[Seq[Long]] = { - userTweetEngagement.tweetDetails.flatMap(_.mentionUserIds.map(_.intersect(validEntityUserIds))) - } - - // A mediatag is only valid if the mediatagged user follows the source user - val validMediatagUserIds: Option[Seq[Long]] = { - userTweetEngagement.tweetDetails.flatMap(_.mediatagUserIds.map(_.intersect(validEntityUserIds))) - } -} - -/** - * Stores information about a favorite/unfav engagement. - * NOTE: This could either be Likes, or UNLIKEs (i.e. when user cancels the Like) - * @param userTweetEngagement the engagement details - */ -case class TweetFavoriteEventDetails( - userTweetEngagement: UserTweetEngagement) - -/** - * Stores information about a unified user action engagement. - * @param userTweetEngagement the engagement details - */ -case class UuaEngagementEventDetails( - userTweetEngagement: UserTweetEngagement) - -/** - * Details about a user-tweet engagement, like when a user tweeted/liked a tweet - * @param engageUserId User that engaged with the tweet - * @param action The action the user took on the tweet - * @param tweetId The type of engagement the user took on the tweet - */ -case class UserTweetEngagement( - engageUserId: Long, - engageUser: Option[User], - action: Action, - engagementTimeMillis: Option[Long], - tweetId: Long, - tweetDetails: Option[TweetDetails]) - -/** - * Helper class that decomposes a tweet object and provides related details about this tweet - */ -case class TweetDetails(tweet: Tweet) { - val authorId: Option[Long] = tweet.coreData.map(_.userId) - - val urls: Option[Seq[String]] = tweet.urls.map(_.map(_.url)) - - val mediaUrls: Option[Seq[String]] = tweet.media.map(_.map(_.expandedUrl)) - - val hashtags: Option[Seq[String]] = tweet.hashtags.map(_.map(_.text)) - - // mentionUserIds include reply user ids at the beginning of a tweet - val mentionUserIds: Option[Seq[Long]] = tweet.mentions.map(_.flatMap(_.userId)) - - val mediatagUserIds: Option[Seq[Long]] = tweet.mediaTags.map { - _.tagMap.flatMap { - case (_, mediaTag) => mediaTag.flatMap(_.userId) - }.toSeq - } - - val replySourceId: Option[Long] = tweet.coreData.flatMap(_.reply.flatMap(_.inReplyToStatusId)) - val replyUserId: Option[Long] = tweet.coreData.flatMap(_.reply.map(_.inReplyToUserId)) - - val retweetSourceId: Option[Long] = tweet.coreData.flatMap(_.share.map(_.sourceStatusId)) - val retweetUserId: Option[Long] = tweet.coreData.flatMap(_.share.map(_.sourceUserId)) - - val quoteSourceId: Option[Long] = tweet.quotedTweet.map(_.tweetId) - val quoteUserId: Option[Long] = tweet.quotedTweet.map(_.userId) - val quoteTweetUrl: Option[String] = tweet.quotedTweet.flatMap(_.permalink.map(_.shortUrl)) - - //If the tweet is retweet/reply/quote, this is the tweet that the new tweet responds to - val (sourceTweetId, sourceTweetUserId) = { - (replySourceId, retweetSourceId, quoteSourceId) match { - case (Some(replyId), _, _) => - (Some(replyId), replyUserId) - case (_, Some(retweetId), _) => - (Some(retweetId), retweetUserId) - case (_, _, Some(quoteId)) => - (Some(quoteId), quoteUserId) - case _ => - (None, None) - } - } - - // Boolean information - val hasPhoto: Boolean = TweetUtil.containsPhotoTweet(tweet) - - val hasVideo: Boolean = TweetUtil.containsVideoTweet(tweet) - - // TweetyPie does not populate url fields in a quote tweet create event, even though we - // consider quote tweets as url tweets. This boolean helps make up for it. - // Details: https://groups.google.com/a/twitter.com/d/msg/eng/BhK1XAcSSWE/F8Gc4_5uDwAJ - val hasQuoteTweetUrl: Boolean = tweet.quotedTweet.exists(_.permalink.isDefined) - - val hasUrl: Boolean = this.urls.exists(_.nonEmpty) || hasQuoteTweetUrl - - val hasHashtag: Boolean = this.hashtags.exists(_.nonEmpty) - - val isCard: Boolean = hasUrl | hasPhoto | hasVideo - - implicit def bool2Long(b: Boolean): Long = if (b) 1L else 0L - - // Return a hashed long that contains card type information of the tweet - val cardInfo: Long = isCard | (hasUrl << 1) | (hasPhoto << 2) | (hasVideo << 3) - - // nullcast tweet is one that is purposefully not broadcast to followers, ex. an ad tweet. - val isNullCastTweet: Boolean = tweet.coreData.exists(_.nullcast) -} diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/uua_processors/BUILD b/recos-injector/server/src/main/scala/com/twitter/recosinjector/uua_processors/BUILD deleted file mode 100644 index c1370fe90..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/uua_processors/BUILD +++ /dev/null @@ -1,24 +0,0 @@ -scala_library( - platform = "java11", - strict_deps = False, - tags = ["bazel-compatible"], - dependencies = [ - "eventbus/client", - "kafka/finagle-kafka/finatra-kafka/src/main/scala", - "kafka/libs/src/main/scala/com/twitter/kafka/client/headers", - "kafka/libs/src/main/scala/com/twitter/kafka/client/processor", - "recos-injector/server/config", - "recos-injector/server/src/main/scala/com/twitter/recosinjector/clients", - "recos-injector/server/src/main/scala/com/twitter/recosinjector/config", - "recos-injector/server/src/main/scala/com/twitter/recosinjector/decider", - "recos-injector/server/src/main/scala/com/twitter/recosinjector/edges", - "recos-injector/server/src/main/scala/com/twitter/recosinjector/publishers", - "src/thrift/com/twitter/clientapp/gen:clientapp-scala", - "src/thrift/com/twitter/gizmoduck:user-thrift-scala", - "src/thrift/com/twitter/socialgraph:thrift-scala", - "src/thrift/com/twitter/timelineservice/server/internal:thrift-scala", - "src/thrift/com/twitter/tweetypie:events-scala", - "src/thrift/com/twitter/tweetypie:tweet-scala", - "unified_user_actions/thrift/src/main/thrift/com/twitter/unified_user_actions:unified_user_actions-scala", - ], -) diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/uua_processors/UnifiedUserActionProcessor.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/uua_processors/UnifiedUserActionProcessor.scala deleted file mode 100644 index 731fa343b..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/uua_processors/UnifiedUserActionProcessor.scala +++ /dev/null @@ -1,181 +0,0 @@ -package com.twitter.recosinjector.uua_processors - -import org.apache.kafka.clients.consumer.ConsumerRecord -import com.twitter.finatra.kafka.serde.UnKeyed -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.recos.util.Action -import com.twitter.recos.util.Action.Action -import com.twitter.recosinjector.clients.Gizmoduck -import com.twitter.recosinjector.clients.Tweetypie -import com.twitter.recosinjector.edges.UnifiedUserActionToUserVideoGraphBuilder -import com.twitter.recosinjector.edges.UnifiedUserActionToUserAdGraphBuilder -import com.twitter.recosinjector.edges.UnifiedUserActionToUserTweetGraphPlusBuilder -import com.twitter.unified_user_actions.thriftscala.UnifiedUserAction -import com.twitter.unified_user_actions.thriftscala.ActionType -import com.twitter.unified_user_actions.thriftscala.Item -import com.twitter.recosinjector.filters.UserFilter -import com.twitter.recosinjector.publishers.KafkaEventPublisher -import com.twitter.recosinjector.util.TweetDetails -import com.twitter.recosinjector.util.UserTweetEngagement -import com.twitter.recosinjector.util.UuaEngagementEventDetails -import com.twitter.unified_user_actions.thriftscala.NotificationContent -import com.twitter.unified_user_actions.thriftscala.NotificationInfo -import com.twitter.util.Future - -class UnifiedUserActionProcessor( - gizmoduck: Gizmoduck, - tweetypie: Tweetypie, - kafkaEventPublisher: KafkaEventPublisher, - userVideoGraphTopic: String, - userVideoGraphBuilder: UnifiedUserActionToUserVideoGraphBuilder, - userAdGraphTopic: String, - userAdGraphBuilder: UnifiedUserActionToUserAdGraphBuilder, - userTweetGraphPlusTopic: String, - userTweetGraphPlusBuilder: UnifiedUserActionToUserTweetGraphPlusBuilder -)( - implicit statsReceiver: StatsReceiver) { - - val messagesProcessedCount = statsReceiver.counter("messages_processed") - - val eventsByTypeCounts = statsReceiver.scope("events_by_type") - private val numSelfEngageCounter = statsReceiver.counter("num_self_engage_event") - private val numTweetFailSafetyLevelCounter = statsReceiver.counter("num_fail_tweetypie_safety") - private val numNullCastTweetCounter = statsReceiver.counter("num_null_cast_tweet") - private val numEngageUserUnsafeCounter = statsReceiver.counter("num_engage_user_unsafe") - private val engageUserFilter = new UserFilter(gizmoduck)(statsReceiver.scope("engage_user")) - private val numNoProcessTweetCounter = statsReceiver.counter("num_no_process_tweet") - private val numProcessTweetCounter = statsReceiver.counter("num_process_tweet") - - private def getUuaEngagementEventDetails( - unifiedUserAction: UnifiedUserAction - ): Option[Future[UuaEngagementEventDetails]] = { - val userIdOpt = unifiedUserAction.userIdentifier.userId - val tweetIdOpt = unifiedUserAction.item match { - case Item.TweetInfo(tweetInfo) => Some(tweetInfo.actionTweetId) - case Item.NotificationInfo( - NotificationInfo(_, NotificationContent.TweetNotification(notification))) => - Some(notification.tweetId) - case _ => None - } - val timestamp = unifiedUserAction.eventMetadata.sourceTimestampMs - val action = getTweetAction(unifiedUserAction.actionType) - - tweetIdOpt - .flatMap { tweetId => - userIdOpt.map { engageUserId => - val tweetFut = tweetypie.getTweet(tweetId) - tweetFut.map { tweetOpt => - val tweetDetailsOpt = tweetOpt.map(TweetDetails) - val engagement = UserTweetEngagement( - engageUserId = engageUserId, - action = action, - engagementTimeMillis = Some(timestamp), - tweetId = tweetId, - engageUser = None, - tweetDetails = tweetDetailsOpt - ) - UuaEngagementEventDetails(engagement) - } - } - } - } - - private def getTweetAction(action: ActionType): Action = { - action match { - case ActionType.ClientTweetVideoPlayback50 => Action.VideoPlayback50 - case ActionType.ClientTweetClick => Action.Click - case ActionType.ClientTweetVideoPlayback75 => Action.VideoPlayback75 - case ActionType.ClientTweetVideoQualityView => Action.VideoQualityView - case ActionType.ServerTweetFav => Action.Favorite - case ActionType.ServerTweetReply => Action.Reply - case ActionType.ServerTweetRetweet => Action.Retweet - case ActionType.ClientTweetQuote => Action.Quote - case ActionType.ClientNotificationOpen => Action.NotificationOpen - case ActionType.ClientTweetEmailClick => Action.EmailClick - case ActionType.ClientTweetShareViaBookmark => Action.Share - case ActionType.ClientTweetShareViaCopyLink => Action.Share - case ActionType.ClientTweetSeeFewer => Action.TweetSeeFewer - case ActionType.ClientTweetNotRelevant => Action.TweetNotRelevant - case ActionType.ClientTweetNotInterestedIn => Action.TweetNotInterestedIn - case ActionType.ServerTweetReport => Action.TweetReport - case ActionType.ClientTweetMuteAuthor => Action.TweetMuteAuthor - case ActionType.ClientTweetBlockAuthor => Action.TweetBlockAuthor - case _ => Action.UnDefined - } - } - private def shouldProcessTweetEngagement( - event: UuaEngagementEventDetails, - isAdsUseCase: Boolean = false - ): Future[Boolean] = { - val engagement = event.userTweetEngagement - val engageUserId = engagement.engageUserId - val authorIdOpt = engagement.tweetDetails.flatMap(_.authorId) - - val isSelfEngage = authorIdOpt.contains(engageUserId) - val isNullCastTweet = engagement.tweetDetails.forall(_.isNullCastTweet) - val isEngageUserSafeFut = engageUserFilter.filterByUserId(engageUserId) - val isTweetPassSafety = - engagement.tweetDetails.isDefined // Tweetypie can fetch a tweet object successfully - - isEngageUserSafeFut.map { isEngageUserSafe => - if (isSelfEngage) numSelfEngageCounter.incr() - if (isNullCastTweet) numNullCastTweetCounter.incr() - if (!isEngageUserSafe) numEngageUserUnsafeCounter.incr() - if (!isTweetPassSafety) numTweetFailSafetyLevelCounter.incr() - - !isSelfEngage && (!isNullCastTweet && !isAdsUseCase || isNullCastTweet && isAdsUseCase) && isEngageUserSafe && isTweetPassSafety - } - } - - def apply(record: ConsumerRecord[UnKeyed, UnifiedUserAction]): Future[Unit] = { - - messagesProcessedCount.incr() - val unifiedUserAction = record.value - eventsByTypeCounts.counter(unifiedUserAction.actionType.toString).incr() - - getTweetAction(unifiedUserAction.actionType) match { - case Action.UnDefined => - numNoProcessTweetCounter.incr() - Future.Unit - case action => - getUuaEngagementEventDetails(unifiedUserAction) - .map { - _.flatMap { detail => - // The following cases are set up specifically for an ads relevance demo. - val actionForAds = Set(Action.Click, Action.Favorite, Action.VideoPlayback75) - if (actionForAds.contains(action)) - shouldProcessTweetEngagement(detail, isAdsUseCase = true).map { - case true => - userAdGraphBuilder.processEvent(detail).map { edges => - edges.foreach { edge => - kafkaEventPublisher - .publish(edge.convertToRecosHoseMessage, userAdGraphTopic) - } - } - numProcessTweetCounter.incr() - case _ => - } - - shouldProcessTweetEngagement(detail).map { - case true => - userVideoGraphBuilder.processEvent(detail).map { edges => - edges.foreach { edge => - kafkaEventPublisher - .publish(edge.convertToRecosHoseMessage, userVideoGraphTopic) - } - } - - userTweetGraphPlusBuilder.processEvent(detail).map { edges => - edges.foreach { edge => - kafkaEventPublisher - .publish(edge.convertToRecosHoseMessage, userTweetGraphPlusTopic) - } - } - numProcessTweetCounter.incr() - case _ => - } - } - }.getOrElse(Future.Unit) - } - } -} diff --git a/recos-injector/server/src/main/scala/com/twitter/recosinjector/uua_processors/UnifiedUserActionsConsumer.scala b/recos-injector/server/src/main/scala/com/twitter/recosinjector/uua_processors/UnifiedUserActionsConsumer.scala deleted file mode 100644 index 022343cea..000000000 --- a/recos-injector/server/src/main/scala/com/twitter/recosinjector/uua_processors/UnifiedUserActionsConsumer.scala +++ /dev/null @@ -1,71 +0,0 @@ -package com.twitter.recosinjector.uua_processors - -import com.twitter.conversions.DurationOps._ -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.finatra.kafka.consumers.FinagleKafkaConsumerBuilder -import com.twitter.finatra.kafka.domain.KafkaGroupId -import com.twitter.finatra.kafka.domain.SeekStrategy -import com.twitter.finatra.kafka.serde.ScalaSerdes -import com.twitter.finatra.kafka.serde.UnKeyed -import com.twitter.finatra.kafka.serde.UnKeyedSerde -import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.common.config.SaslConfigs -import org.apache.kafka.common.config.SslConfigs -import org.apache.kafka.common.security.auth.SecurityProtocol -import com.twitter.unified_user_actions.thriftscala.UnifiedUserAction -import com.twitter.kafka.client.processor.AtLeastOnceProcessor -import com.twitter.kafka.client.processor.ThreadSafeKafkaConsumerClient -import com.twitter.conversions.StorageUnitOps._ - -class UnifiedUserActionsConsumer( - processor: UnifiedUserActionProcessor, - truststoreLocation: String -)( - implicit statsReceiver: StatsReceiver) { - import UnifiedUserActionsConsumer._ - - private val kafkaClient = new ThreadSafeKafkaConsumerClient[UnKeyed, UnifiedUserAction]( - FinagleKafkaConsumerBuilder[UnKeyed, UnifiedUserAction]() - .groupId(KafkaGroupId(uuaRecosInjectorGroupId)) - .keyDeserializer(UnKeyedSerde.deserializer) - .valueDeserializer(ScalaSerdes.Thrift[UnifiedUserAction].deserializer) - .dest(uuaDest) - .maxPollRecords(maxPollRecords) - .maxPollInterval(maxPollInterval) - .fetchMax(fetchMax) - .seekStrategy(SeekStrategy.END) - .enableAutoCommit(false) // AtLeastOnceProcessor performs commits manually - .withConfig(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.toString) - .withConfig(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreLocation) - .withConfig(SaslConfigs.SASL_MECHANISM, SaslConfigs.GSSAPI_MECHANISM) - .withConfig(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka") - .withConfig(SaslConfigs.SASL_KERBEROS_SERVER_NAME, "kafka") - .config) - - val atLeastOnceProcessor: AtLeastOnceProcessor[UnKeyed, UnifiedUserAction] = { - AtLeastOnceProcessor[UnKeyed, UnifiedUserAction]( - name = processorName, - topic = uuaTopic, - consumer = kafkaClient, - processor = processor.apply, - maxPendingRequests = maxPendingRequests, - workerThreads = workerThreads, - commitIntervalMs = commitIntervalMs, - statsReceiver = statsReceiver.scope(processorName) - ) - } - -} - -object UnifiedUserActionsConsumer { - val maxPollRecords = 1000 - val maxPollInterval = 5.minutes - val fetchMax = 1.megabytes - val maxPendingRequests = 1000 - val workerThreads = 16 - val commitIntervalMs = 10.seconds.inMilliseconds - val processorName = "unified_user_actions_processor" - val uuaTopic = "unified_user_actions_engagements" - val uuaDest = "/s/kafka/bluebird-1:kafka-tls" - val uuaRecosInjectorGroupId = "recos-injector" -}