diff --git a/user-signal-service/README.md b/user-signal-service/README.md deleted file mode 100644 index d30568cf4..000000000 --- a/user-signal-service/README.md +++ /dev/null @@ -1,5 +0,0 @@ -# User Signal Service # - -**User Signal Service** (USS) is a centralized online platform that supplies comprehensive data on user actions and behaviors on Twitter. This information encompasses both explicit signals, such as favoriting, retweeting, and replying, as well as implicit signals, including tweet clicks, video views, profile visits, and more. - -To ensure consistency and accuracy, USS gathers these signals from various underlying datasets and online services, processing them into uniform formats. These standardized source signals are then utilized in candidate retrieval and machine learning features for ranking stages. \ No newline at end of file diff --git a/user-signal-service/server/BUILD b/user-signal-service/server/BUILD deleted file mode 100644 index 76ff96764..000000000 --- a/user-signal-service/server/BUILD +++ /dev/null @@ -1,21 +0,0 @@ -jvm_binary( - name = "bin", - basename = "user-signal-service", - main = "com.twitter.usersignalservice.UserSignalServiceStratoFedServerMain", - runtime_platform = "java11", - tags = ["bazel-compatible"], - dependencies = [ - "3rdparty/jvm/ch/qos/logback:logback-classic", - "loglens/loglens-logback/src/main/scala/com/twitter/loglens/logback", - "strato/src/main/scala/com/twitter/strato/logging/logback", - "user-signal-service/server/src/main/resources", - "user-signal-service/server/src/main/scala/com/twitter/usersignalservice", - ], -) - -# Aurora Workflows build phase convention requires a jvm_app named with ${project-name}-app -jvm_app( - name = "user-signal-service-app", - archive = "zip", - binary = ":bin", -) diff --git a/user-signal-service/server/src/main/resources/BUILD b/user-signal-service/server/src/main/resources/BUILD deleted file mode 100644 index b35d9c9d4..000000000 --- a/user-signal-service/server/src/main/resources/BUILD +++ /dev/null @@ -1,7 +0,0 @@ -resources( - sources = [ - "*.xml", - "*.yml", - "config/*.yml", - ], -) diff --git a/user-signal-service/server/src/main/resources/config/decider.yml b/user-signal-service/server/src/main/resources/config/decider.yml deleted file mode 100644 index f22a9dc22..000000000 --- a/user-signal-service/server/src/main/resources/config/decider.yml +++ /dev/null @@ -1,6 +0,0 @@ -test_value: - comment: Test Value - default_availability: 10000 -dark_traffic_percent: - comment: Percentage of traffic to send to dark traffic destination - default_availability: 0 \ No newline at end of file diff --git a/user-signal-service/server/src/main/resources/logback.xml b/user-signal-service/server/src/main/resources/logback.xml deleted file mode 100644 index 6511278df..000000000 --- a/user-signal-service/server/src/main/resources/logback.xml +++ /dev/null @@ -1,155 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - true - - - - - ${log.service.output} - - ${log.service.output}.%i - 1 - 10 - - - 50MB - - - %date %.-3level ${DEFAULT_SERVICE_PATTERN}%n - - - - - - ${log.strato_only.output} - - ${log.strato_only.output}.%i - 1 - 10 - - - 50MB - - - %date %.-3level ${DEFAULT_SERVICE_PATTERN}%n - - - - - - true - loglens - ${log.lens.index} - ${log.lens.tag}/service - - %msg%n - - - 500 - 50 - - - manhattan-client - .*InvalidRequest.* - - - - - - - - - ${async_queue_size} - ${async_max_flush_time} - - - - - ${async_queue_size} - ${async_max_flush_time} - - - - - ${async_queue_size} - ${async_max_flush_time} - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/BUILD b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/BUILD deleted file mode 100644 index 248fff64b..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/BUILD +++ /dev/null @@ -1,9 +0,0 @@ -scala_library( - compiler_option_sets = ["fatal_warnings"], - tags = ["bazel-compatible"], - dependencies = [ - "user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base", - "user-signal-service/server/src/main/scala/com/twitter/usersignalservice/columns", - "user-signal-service/server/src/main/scala/com/twitter/usersignalservice/config", - ], -) diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/UserSignalServiceStratoFedServerMain.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/UserSignalServiceStratoFedServerMain.scala deleted file mode 100644 index 878310abb..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/UserSignalServiceStratoFedServerMain.scala +++ /dev/null @@ -1,32 +0,0 @@ -package com.twitter.usersignalservice - -import com.google.inject.Module -import com.twitter.inject.thrift.modules.ThriftClientIdModule -import com.twitter.usersignalservice.columns.UserSignalServiceColumn -import com.twitter.strato.fed._ -import com.twitter.strato.fed.server._ -import com.twitter.usersignalservice.module.CacheModule -import com.twitter.usersignalservice.module.MHMtlsParamsModule -import com.twitter.usersignalservice.module.SocialGraphServiceClientModule -import com.twitter.usersignalservice.module.TimerModule - -object UserSignalServiceStratoFedServerMain extends UserSignalServiceStratoFedServer - -trait UserSignalServiceStratoFedServer extends StratoFedServer { - override def dest: String = "/s/user-signal-service/user-signal-service" - - override def columns: Seq[Class[_ <: StratoFed.Column]] = - Seq( - classOf[UserSignalServiceColumn] - ) - - override def modules: Seq[Module] = - Seq( - CacheModule, - MHMtlsParamsModule, - SocialGraphServiceClientModule, - ThriftClientIdModule, - TimerModule, - ) - -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/AggregatedSignalController.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/AggregatedSignalController.scala deleted file mode 100644 index fb698b01a..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/AggregatedSignalController.scala +++ /dev/null @@ -1,58 +0,0 @@ -package com.twitter.usersignalservice.base - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.frigate.common.base.Stats -import com.twitter.storehaus.ReadableStore -import com.twitter.twistly.common.UserId -import com.twitter.usersignalservice.base.BaseSignalFetcher.Timeout -import com.twitter.usersignalservice.thriftscala.Signal -import com.twitter.usersignalservice.thriftscala.SignalType -import com.twitter.util.Future -import com.twitter.util.Timer - -case class AggregatedSignalController( - signalsAggregationInfo: Seq[SignalAggregatedInfo], - signalsWeightMapInfo: Map[SignalType, Double], - stats: StatsReceiver, - timer: Timer) - extends ReadableStore[Query, Seq[Signal]] { - - val name: String = this.getClass.getCanonicalName - val statsReceiver: StatsReceiver = stats.scope(name) - - override def get(query: Query): Future[Option[Seq[Signal]]] = { - Stats - .trackItems(statsReceiver) { - val allSignalsFut = - Future - .collect(signalsAggregationInfo.map(_.getSignals(query.userId))).map(_.flatten.flatten) - val aggregatedSignals = - allSignalsFut.map { allSignals => - allSignals - .groupBy(_.targetInternalId).collect { - case (Some(internalId), signals) => - val mostRecentEnagementTime = signals.map(_.timestamp).max - val totalWeight = - signals - .map(signal => signalsWeightMapInfo.getOrElse(signal.signalType, 0.0)).sum - (Signal(query.signalType, mostRecentEnagementTime, Some(internalId)), totalWeight) - }.toSeq.sortBy { case (signal, weight) => (-weight, -signal.timestamp) } - .map(_._1) - .take(query.maxResults.getOrElse(Int.MaxValue)) - } - aggregatedSignals.map(Some(_)) - }.raiseWithin(Timeout)(timer).handle { - case e => - statsReceiver.counter(e.getClass.getCanonicalName).incr() - Some(Seq.empty[Signal]) - } - } -} - -case class SignalAggregatedInfo( - signalType: SignalType, - signalFetcher: ReadableStore[Query, Seq[Signal]]) { - def getSignals(userId: UserId): Future[Option[Seq[Signal]]] = { - signalFetcher.get(Query(userId, signalType, None)) - } -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/BUILD b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/BUILD deleted file mode 100644 index 83bb0aa3e..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/BUILD +++ /dev/null @@ -1,16 +0,0 @@ -scala_library( - compiler_option_sets = ["fatal_warnings"], - tags = ["bazel-compatible"], - dependencies = [ - "3rdparty/src/jvm/com/twitter/storehaus:core", - "finagle/finagle-stats", - "frigate/frigate-common/src/main/scala/com/twitter/frigate/common/store", - "frigate/frigate-common/src/main/scala/com/twitter/frigate/common/store/strato", - "hermit/hermit-core/src/main/scala/com/twitter/hermit/store/common", - "relevance-platform/src/main/scala/com/twitter/relevance_platform/common/injection", - "src/scala/com/twitter/storehaus_internal/manhattan", - "src/scala/com/twitter/twistly/common", - "src/thrift/com/twitter/simclusters_v2:simclusters_v2-thrift-scala", - "user-signal-service/thrift/src/main/thrift:thrift-scala", - ], -) diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/BaseSignalFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/BaseSignalFetcher.scala deleted file mode 100644 index 27646b9cc..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/BaseSignalFetcher.scala +++ /dev/null @@ -1,90 +0,0 @@ -package com.twitter.usersignalservice -package base - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.storehaus.ReadableStore -import com.twitter.usersignalservice.thriftscala.Signal -import com.twitter.util.Future -import com.twitter.twistly.common.UserId -import com.twitter.usersignalservice.thriftscala.SignalType -import com.twitter.frigate.common.base.Stats -import com.twitter.conversions.DurationOps._ -import com.twitter.usersignalservice.thriftscala.ClientIdentifier -import com.twitter.util.Duration -import com.twitter.util.Timer -import java.io.Serializable - -case class Query( - userId: UserId, - signalType: SignalType, - maxResults: Option[Int], - clientId: ClientIdentifier = ClientIdentifier.Unknown) - -/** - * A trait that defines a standard interface for the signal fetcher - * - * Extends this only when all other traits extending BaseSignalFetcher do not apply to - * your use case. - */ -trait BaseSignalFetcher extends ReadableStore[Query, Seq[Signal]] { - import BaseSignalFetcher._ - - /** - * This RawSignalType is the output type of `getRawSignals` and the input type of `process`. - * Override it as your own raw signal type to maintain meta data which can be used in the - * step of `process`. - * Note that the RawSignalType is an intermediate data type intended to be small to avoid - * big data chunks being passed over functions or being memcached. - */ - type RawSignalType <: Serializable - - def name: String - def statsReceiver: StatsReceiver - def timer: Timer - - /** - * This function is called by the top level class to fetch signals. It executes the pipeline to - * fetch raw signals, process and transform the signals. Exceptions and timeout control are - * handled here. - * @param query - * @return Future[Option[Seq[Signal]]] - */ - override def get(query: Query): Future[Option[Seq[Signal]]] = { - val clientStatsReceiver = statsReceiver.scope(query.clientId.name).scope(query.signalType.name) - Stats - .trackItems(clientStatsReceiver) { - val rawSignals = getRawSignals(query.userId) - val signals = process(query, rawSignals) - signals - }.raiseWithin(Timeout)(timer).handle { - case e => - clientStatsReceiver.scope("FetcherExceptions").counter(e.getClass.getCanonicalName).incr() - EmptyResponse - } - } - - /** - * Override this function to define how to fetch the raw signals from any store - * Note that the RawSignalType is an intermediate data type intended to be small to avoid - * big data chunks being passed over functions or being memcached. - * @param userId - * @return Future[Option[Seq[RawSignalType]]] - */ - def getRawSignals(userId: UserId): Future[Option[Seq[RawSignalType]]] - - /** - * Override this function to define how to process the raw signals and transform them to signals. - * @param query - * @param rawSignals - * @return Future[Option[Seq[Signal]]] - */ - def process( - query: Query, - rawSignals: Future[Option[Seq[RawSignalType]]] - ): Future[Option[Seq[Signal]]] -} - -object BaseSignalFetcher { - val Timeout: Duration = 20.milliseconds - val EmptyResponse: Option[Seq[Signal]] = Some(Seq.empty[Signal]) -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/FilteredSignalFetcherController.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/FilteredSignalFetcherController.scala deleted file mode 100644 index e2e0e96fe..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/FilteredSignalFetcherController.scala +++ /dev/null @@ -1,75 +0,0 @@ -package com.twitter.usersignalservice.base - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.frigate.common.base.Stats -import com.twitter.storehaus.ReadableStore -import com.twitter.usersignalservice.thriftscala.Signal -import com.twitter.usersignalservice.thriftscala.SignalType -import com.twitter.util.Future -import com.twitter.util.Timer - -/** - * Combine a BaseSignalFetcher with a map of negative signalFetchers. Filter out the negative - * signals from the signals from BaseSignalFetcher. - */ -case class FilteredSignalFetcherController( - backingSignalFetcher: BaseSignalFetcher, - originSignalType: SignalType, - stats: StatsReceiver, - timer: Timer, - filterSignalFetchers: Map[SignalType, BaseSignalFetcher] = - Map.empty[SignalType, BaseSignalFetcher]) - extends ReadableStore[Query, Seq[Signal]] { - val statsReceiver: StatsReceiver = stats.scope(this.getClass.getCanonicalName) - - override def get(query: Query): Future[Option[Seq[Signal]]] = { - val clientStatsReceiver = statsReceiver.scope(query.signalType.name).scope(query.clientId.name) - Stats - .trackItems(clientStatsReceiver) { - val backingSignals = - backingSignalFetcher.get(Query(query.userId, originSignalType, None, query.clientId)) - val filteredSignals = filter(query, backingSignals) - filteredSignals - }.raiseWithin(BaseSignalFetcher.Timeout)(timer).handle { - case e => - clientStatsReceiver.scope("FetcherExceptions").counter(e.getClass.getCanonicalName).incr() - BaseSignalFetcher.EmptyResponse - } - } - - def filter( - query: Query, - rawSignals: Future[Option[Seq[Signal]]] - ): Future[Option[Seq[Signal]]] = { - Stats - .trackItems(statsReceiver) { - val originSignals = rawSignals.map(_.getOrElse(Seq.empty[Signal])) - val filterSignals = - Future - .collect { - filterSignalFetchers.map { - case (signalType, signalFetcher) => - signalFetcher - .get(Query(query.userId, signalType, None, query.clientId)) - .map(_.getOrElse(Seq.empty)) - }.toSeq - }.map(_.flatten.toSet) - val filterSignalsSet = filterSignals - .map(_.flatMap(_.targetInternalId)) - - val originSignalsWithId = - originSignals.map(_.map(signal => (signal, signal.targetInternalId))) - Future.join(originSignalsWithId, filterSignalsSet).map { - case (originSignalsWithId, filterSignalsSet) => - Some( - originSignalsWithId - .collect { - case (signal, internalIdOpt) - if internalIdOpt.nonEmpty && !filterSignalsSet.contains(internalIdOpt.get) => - signal - }.take(query.maxResults.getOrElse(Int.MaxValue))) - } - } - } - -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/ManhattanSignalFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/ManhattanSignalFetcher.scala deleted file mode 100644 index d0918a165..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/ManhattanSignalFetcher.scala +++ /dev/null @@ -1,66 +0,0 @@ -package com.twitter.usersignalservice -package base - -import com.twitter.bijection.Codec -import com.twitter.storage.client.manhattan.kv.ManhattanKVClientMtlsParams -import com.twitter.storehaus.ReadableStore -import com.twitter.storehaus_internal.manhattan.ManhattanCluster -import com.twitter.storehaus_internal.manhattan.ManhattanRO -import com.twitter.storehaus_internal.manhattan.ManhattanROConfig -import com.twitter.storehaus_internal.util.HDFSPath -import com.twitter.twistly.common.UserId -import com.twitter.util.Future -import com.twitter.storehaus_internal.util.ApplicationID -import com.twitter.storehaus_internal.util.DatasetName - -/** - * A Manhattan signal fetcher extending BaseSignalFetcher to provide an interface to fetch signals - * from a Manhattan dataset. - * - * Extends this when the underlying store is a single Manhattan dataset. - * @tparam ManhattanKeyType - * @tparam ManhattanValueType - */ -trait ManhattanSignalFetcher[ManhattanKeyType, ManhattanValueType] extends BaseSignalFetcher { - /* - Define the meta info of the Manhattan dataset - */ - protected def manhattanAppId: String - protected def manhattanDatasetName: String - protected def manhattanClusterId: ManhattanCluster - protected def manhattanKVClientMtlsParams: ManhattanKVClientMtlsParams - - protected def manhattanKeyCodec: Codec[ManhattanKeyType] - protected def manhattanRawSignalCodec: Codec[ManhattanValueType] - - /** - * Adaptor to transform the userId to the ManhattanKey - * @param userId - * @return ManhattanKeyType - */ - protected def toManhattanKey(userId: UserId): ManhattanKeyType - - /** - * Adaptor to transform the ManhattanValue to the Seq of RawSignalType - * @param manhattanValue - * @return Seq[RawSignalType] - */ - protected def toRawSignals(manhattanValue: ManhattanValueType): Seq[RawSignalType] - - protected final lazy val underlyingStore: ReadableStore[UserId, Seq[RawSignalType]] = { - ManhattanRO - .getReadableStoreWithMtls[ManhattanKeyType, ManhattanValueType]( - ManhattanROConfig( - HDFSPath(""), - ApplicationID(manhattanAppId), - DatasetName(manhattanDatasetName), - manhattanClusterId), - manhattanKVClientMtlsParams - )(manhattanKeyCodec, manhattanRawSignalCodec) - .composeKeyMapping(userId => toManhattanKey(userId)) - .mapValues(manhattanRawSignal => toRawSignals(manhattanRawSignal)) - } - - override final def getRawSignals(userId: UserId): Future[Option[Seq[RawSignalType]]] = - underlyingStore.get(userId) -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/MemcachedSignalFetcherWrapper.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/MemcachedSignalFetcherWrapper.scala deleted file mode 100644 index 4022d9021..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/MemcachedSignalFetcherWrapper.scala +++ /dev/null @@ -1,70 +0,0 @@ -package com.twitter.usersignalservice -package base - -import com.twitter.finagle.memcached.{Client => MemcachedClient} -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.hashing.KeyHasher -import com.twitter.hermit.store.common.ObservedMemcachedReadableStore -import com.twitter.relevance_platform.common.injection.LZ4Injection -import com.twitter.relevance_platform.common.injection.SeqObjectInjection -import com.twitter.storehaus.ReadableStore -import com.twitter.twistly.common.UserId -import com.twitter.usersignalservice.thriftscala.Signal -import com.twitter.util.Duration -import com.twitter.util.Future -import com.twitter.util.Timer - -/** - * Use this wrapper when the latency of the signal fetcher is too high (see BaseSignalFetcher.Timeout - * ) and the results from the signal fetcher don't change often (e.g. results are generated from a - * scalding job scheduled each day). - * @param memcachedClient - * @param baseSignalFetcher - * @param ttl - * @param stats - * @param timer - */ -case class MemcachedSignalFetcherWrapper( - memcachedClient: MemcachedClient, - baseSignalFetcher: BaseSignalFetcher, - ttl: Duration, - stats: StatsReceiver, - keyPrefix: String, - timer: Timer) - extends BaseSignalFetcher { - import MemcachedSignalFetcherWrapper._ - override type RawSignalType = baseSignalFetcher.RawSignalType - - override val name: String = this.getClass.getCanonicalName - override val statsReceiver: StatsReceiver = stats.scope(name).scope(baseSignalFetcher.name) - - val underlyingStore: ReadableStore[UserId, Seq[RawSignalType]] = { - val cacheUnderlyingStore = new ReadableStore[UserId, Seq[RawSignalType]] { - override def get(userId: UserId): Future[Option[Seq[RawSignalType]]] = - baseSignalFetcher.getRawSignals(userId) - } - ObservedMemcachedReadableStore.fromCacheClient( - backingStore = cacheUnderlyingStore, - cacheClient = memcachedClient, - ttl = ttl)( - valueInjection = LZ4Injection.compose(SeqObjectInjection[RawSignalType]()), - statsReceiver = statsReceiver, - keyToString = { k: UserId => - s"$keyPrefix:${keyHasher.hashKey(k.toString.getBytes)}" - } - ) - } - - override def getRawSignals(userId: UserId): Future[Option[Seq[RawSignalType]]] = - underlyingStore.get(userId) - - override def process( - query: Query, - rawSignals: Future[Option[Seq[RawSignalType]]] - ): Future[Option[Seq[Signal]]] = baseSignalFetcher.process(query, rawSignals) - -} - -object MemcachedSignalFetcherWrapper { - private val keyHasher: KeyHasher = KeyHasher.FNV1A_64 -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/StratoSignalFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/StratoSignalFetcher.scala deleted file mode 100644 index 2d0de84b6..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/StratoSignalFetcher.scala +++ /dev/null @@ -1,61 +0,0 @@ -package com.twitter.usersignalservice -package base -import com.twitter.frigate.common.store.strato.StratoFetchableStore -import com.twitter.storehaus.ReadableStore -import com.twitter.strato.client.Client -import com.twitter.strato.data.Conv -import com.twitter.twistly.common.UserId -import com.twitter.util.Future - -/** - * A Strato signal fetcher extending BaseSignalFetcher to provide an interface to fetch signals from - * Strato Column. - * - * Extends this when the underlying store is a single Strato column. - * @tparam StratoKeyType - * @tparam StratoViewType - * @tparam StratoValueType - */ -trait StratoSignalFetcher[StratoKeyType, StratoViewType, StratoValueType] - extends BaseSignalFetcher { - /* - Define the meta info of the strato column - */ - def stratoClient: Client - def stratoColumnPath: String - def stratoView: StratoViewType - - /** - * Override these vals and remove the implicit key words. - * @return - */ - protected implicit def keyConv: Conv[StratoKeyType] - protected implicit def viewConv: Conv[StratoViewType] - protected implicit def valueConv: Conv[StratoValueType] - - /** - * Adapter to transform the userId to the StratoKeyType - * @param userId - * @return StratoKeyType - */ - protected def toStratoKey(userId: UserId): StratoKeyType - - /** - * Adapter to transform the StratoValueType to a Seq of RawSignalType - * @param stratoValue - * @return Seq[RawSignalType] - */ - protected def toRawSignals(stratoValue: StratoValueType): Seq[RawSignalType] - - protected final lazy val underlyingStore: ReadableStore[UserId, Seq[RawSignalType]] = - StratoFetchableStore - .withView[StratoKeyType, StratoViewType, StratoValueType]( - stratoClient, - stratoColumnPath, - stratoView) - .composeKeyMapping(toStratoKey) - .mapValues(toRawSignals) - - override final def getRawSignals(userId: UserId): Future[Option[Seq[RawSignalType]]] = - underlyingStore.get(userId) -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/columns/BUILD b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/columns/BUILD deleted file mode 100644 index 1cb85f732..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/columns/BUILD +++ /dev/null @@ -1,11 +0,0 @@ -scala_library( - compiler_option_sets = ["fatal_warnings"], - tags = ["bazel-compatible"], - dependencies = [ - "src/scala/com/twitter/twistly/common", - "strato/src/main/scala/com/twitter/strato/fed", - "strato/src/main/scala/com/twitter/strato/fed/server", - "user-signal-service/server/src/main/scala/com/twitter/usersignalservice/service", - "user-signal-service/thrift/src/main/thrift:thrift-scala", - ], -) diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/columns/UserSignalServiceColumn.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/columns/UserSignalServiceColumn.scala deleted file mode 100644 index aea92ecd1..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/columns/UserSignalServiceColumn.scala +++ /dev/null @@ -1,49 +0,0 @@ -package com.twitter.usersignalservice.columns - -import com.twitter.stitch.NotFound -import com.twitter.stitch.Stitch -import com.twitter.strato.catalog.OpMetadata -import com.twitter.strato.catalog.Ops -import com.twitter.strato.config.Policy -import com.twitter.strato.config.ReadWritePolicy -import com.twitter.strato.data.Conv -import com.twitter.strato.data.Description -import com.twitter.strato.data.Lifecycle -import com.twitter.strato.fed.StratoFed -import com.twitter.strato.thrift.ScroogeConv -import com.twitter.usersignalservice.service.UserSignalService -import com.twitter.usersignalservice.thriftscala.BatchSignalRequest -import com.twitter.usersignalservice.thriftscala.BatchSignalResponse -import javax.inject.Inject - -class UserSignalServiceColumn @Inject() (userSignalService: UserSignalService) - extends StratoFed.Column(UserSignalServiceColumn.Path) - with StratoFed.Fetch.Stitch { - - override val metadata: OpMetadata = OpMetadata( - lifecycle = Some(Lifecycle.Production), - description = Some(Description.PlainText("User Signal Service Federated Column"))) - - override def ops: Ops = super.ops - - override type Key = BatchSignalRequest - override type View = Unit - override type Value = BatchSignalResponse - - override val keyConv: Conv[Key] = ScroogeConv.fromStruct[BatchSignalRequest] - override val viewConv: Conv[View] = Conv.ofType - override val valueConv: Conv[Value] = ScroogeConv.fromStruct[BatchSignalResponse] - - override def fetch(key: Key, view: View): Stitch[Result[Value]] = { - userSignalService - .userSignalServiceHandlerStoreStitch(key) - .map(result => found(result)) - .handle { - case NotFound => missing - } - } -} - -object UserSignalServiceColumn { - val Path = "recommendations/user-signal-service/signals" -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/config/BUILD b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/config/BUILD deleted file mode 100644 index cca1bf2e0..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/config/BUILD +++ /dev/null @@ -1,9 +0,0 @@ -scala_library( - compiler_option_sets = ["fatal_warnings"], - tags = ["bazel-compatible"], - dependencies = [ - "user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base", - "user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals", - "user-signal-service/thrift/src/main/thrift:thrift-scala", - ], -) diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/config/SignalFetcherConfig.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/config/SignalFetcherConfig.scala deleted file mode 100644 index f7238edcc..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/config/SignalFetcherConfig.scala +++ /dev/null @@ -1,253 +0,0 @@ -package com.twitter.usersignalservice.config - -import com.twitter.conversions.DurationOps._ -import com.twitter.finagle.memcached.{Client => MemcachedClient} -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.storehaus.ReadableStore -import com.twitter.usersignalservice.base.BaseSignalFetcher -import com.twitter.usersignalservice.base.AggregatedSignalController -import com.twitter.usersignalservice.base.FilteredSignalFetcherController -import com.twitter.usersignalservice.base.MemcachedSignalFetcherWrapper -import com.twitter.usersignalservice.base.Query -import com.twitter.usersignalservice.base.SignalAggregatedInfo -import com.twitter.usersignalservice.signals.AccountBlocksFetcher -import com.twitter.usersignalservice.signals.AccountFollowsFetcher -import com.twitter.usersignalservice.signals.AccountMutesFetcher -import com.twitter.usersignalservice.signals.NotificationOpenAndClickFetcher -import com.twitter.usersignalservice.signals.OriginalTweetsFetcher -import com.twitter.usersignalservice.signals.ProfileVisitsFetcher -import com.twitter.usersignalservice.signals.ProfileClickFetcher -import com.twitter.usersignalservice.signals.RealGraphOonFetcher -import com.twitter.usersignalservice.signals.ReplyTweetsFetcher -import com.twitter.usersignalservice.signals.RetweetsFetcher -import com.twitter.usersignalservice.signals.TweetClickFetcher -import com.twitter.usersignalservice.signals.TweetFavoritesFetcher -import com.twitter.usersignalservice.signals.TweetSharesFetcher -import com.twitter.usersignalservice.signals.VideoTweetsPlayback50Fetcher -import com.twitter.usersignalservice.signals.VideoTweetsQualityViewFetcher -import com.twitter.usersignalservice.signals.NegativeEngagedUserFetcher -import com.twitter.usersignalservice.signals.NegativeEngagedTweetFetcher -import com.twitter.usersignalservice.thriftscala.Signal -import com.twitter.usersignalservice.thriftscala.SignalType -import com.twitter.util.Timer -import javax.inject.Inject -import javax.inject.Singleton - -@Singleton -class SignalFetcherConfig @Inject() ( - notificationOpenAndClickFetcher: NotificationOpenAndClickFetcher, - accountFollowsFetcher: AccountFollowsFetcher, - profileVisitsFetcher: ProfileVisitsFetcher, - tweetFavoritesFetcher: TweetFavoritesFetcher, - retweetsFetcher: RetweetsFetcher, - replyTweetsFetcher: ReplyTweetsFetcher, - originalTweetsFetcher: OriginalTweetsFetcher, - tweetSharesFetcher: TweetSharesFetcher, - memcachedClient: MemcachedClient, - realGraphOonFetcher: RealGraphOonFetcher, - tweetClickFetcher: TweetClickFetcher, - videoTweetsPlayback50Fetcher: VideoTweetsPlayback50Fetcher, - videoTweetsQualityViewFetcher: VideoTweetsQualityViewFetcher, - accountMutesFetcher: AccountMutesFetcher, - accountBlocksFetcher: AccountBlocksFetcher, - profileClickFetcher: ProfileClickFetcher, - negativeEngagedTweetFetcher: NegativeEngagedTweetFetcher, - negativeEngagedUserFetcher: NegativeEngagedUserFetcher, - statsReceiver: StatsReceiver, - timer: Timer) { - - val MemcachedProfileVisitsFetcher: BaseSignalFetcher = - MemcachedSignalFetcherWrapper( - memcachedClient, - profileVisitsFetcher, - ttl = 8.hours, - statsReceiver, - keyPrefix = "uss:pv", - timer) - - val MemcachedAccountFollowsFetcher: BaseSignalFetcher = MemcachedSignalFetcherWrapper( - memcachedClient, - accountFollowsFetcher, - ttl = 5.minute, - statsReceiver, - keyPrefix = "uss:af", - timer) - - val GoodTweetClickDdgFetcher: SignalType => FilteredSignalFetcherController = signalType => - FilteredSignalFetcherController( - tweetClickFetcher, - signalType, - statsReceiver, - timer, - Map(SignalType.NegativeEngagedTweetId -> negativeEngagedTweetFetcher) - ) - - val GoodProfileClickDdgFetcher: SignalType => FilteredSignalFetcherController = signalType => - FilteredSignalFetcherController( - profileClickFetcher, - signalType, - statsReceiver, - timer, - Map(SignalType.NegativeEngagedUserId -> negativeEngagedUserFetcher) - ) - - val GoodProfileClickDdgFetcherWithBlocksMutes: SignalType => FilteredSignalFetcherController = - signalType => - FilteredSignalFetcherController( - profileClickFetcher, - signalType, - statsReceiver, - timer, - Map( - SignalType.NegativeEngagedUserId -> negativeEngagedUserFetcher, - SignalType.AccountMute -> accountMutesFetcher, - SignalType.AccountBlock -> accountBlocksFetcher - ) - ) - - val realGraphOonFilteredFetcher: FilteredSignalFetcherController = - FilteredSignalFetcherController( - realGraphOonFetcher, - SignalType.RealGraphOon, - statsReceiver, - timer, - Map( - SignalType.NegativeEngagedUserId -> negativeEngagedUserFetcher - ) - ) - - val videoTweetsQualityViewFilteredFetcher: FilteredSignalFetcherController = - FilteredSignalFetcherController( - videoTweetsQualityViewFetcher, - SignalType.VideoView90dQualityV1, - statsReceiver, - timer, - Map(SignalType.NegativeEngagedTweetId -> negativeEngagedTweetFetcher) - ) - - val videoTweetsPlayback50FilteredFetcher: FilteredSignalFetcherController = - FilteredSignalFetcherController( - videoTweetsPlayback50Fetcher, - SignalType.VideoView90dPlayback50V1, - statsReceiver, - timer, - Map(SignalType.NegativeEngagedTweetId -> negativeEngagedTweetFetcher) - ) - - val uniformTweetSignalInfo: Seq[SignalAggregatedInfo] = Seq( - SignalAggregatedInfo(SignalType.TweetFavorite, tweetFavoritesFetcher), - SignalAggregatedInfo(SignalType.Retweet, retweetsFetcher), - SignalAggregatedInfo(SignalType.Reply, replyTweetsFetcher), - SignalAggregatedInfo(SignalType.OriginalTweet, originalTweetsFetcher), - SignalAggregatedInfo(SignalType.TweetShareV1, tweetSharesFetcher), - SignalAggregatedInfo(SignalType.VideoView90dQualityV1, videoTweetsQualityViewFilteredFetcher), - ) - - val uniformProducerSignalInfo: Seq[SignalAggregatedInfo] = Seq( - SignalAggregatedInfo(SignalType.AccountFollow, MemcachedAccountFollowsFetcher), - SignalAggregatedInfo( - SignalType.RepeatedProfileVisit90dMinVisit6V1, - MemcachedProfileVisitsFetcher), - ) - - val memcachedAccountBlocksFetcher: MemcachedSignalFetcherWrapper = MemcachedSignalFetcherWrapper( - memcachedClient, - accountBlocksFetcher, - ttl = 5.minutes, - statsReceiver, - keyPrefix = "uss:ab", - timer) - - val memcachedAccountMutesFetcher: MemcachedSignalFetcherWrapper = MemcachedSignalFetcherWrapper( - memcachedClient, - accountMutesFetcher, - ttl = 5.minutes, - statsReceiver, - keyPrefix = "uss:am", - timer) - - val SignalFetcherMapper: Map[SignalType, ReadableStore[Query, Seq[Signal]]] = Map( - /* Raw Signals */ - SignalType.AccountFollow -> accountFollowsFetcher, - SignalType.AccountFollowWithDelay -> MemcachedAccountFollowsFetcher, - SignalType.GoodProfileClick -> GoodProfileClickDdgFetcher(SignalType.GoodProfileClick), - SignalType.GoodProfileClick20s -> GoodProfileClickDdgFetcher(SignalType.GoodProfileClick20s), - SignalType.GoodProfileClick30s -> GoodProfileClickDdgFetcher(SignalType.GoodProfileClick30s), - SignalType.GoodProfileClickFiltered -> GoodProfileClickDdgFetcherWithBlocksMutes( - SignalType.GoodProfileClick), - SignalType.GoodProfileClick20sFiltered -> GoodProfileClickDdgFetcherWithBlocksMutes( - SignalType.GoodProfileClick20s), - SignalType.GoodProfileClick30sFiltered -> GoodProfileClickDdgFetcherWithBlocksMutes( - SignalType.GoodProfileClick30s), - SignalType.GoodTweetClick -> GoodTweetClickDdgFetcher(SignalType.GoodTweetClick), - SignalType.GoodTweetClick5s -> GoodTweetClickDdgFetcher(SignalType.GoodTweetClick5s), - SignalType.GoodTweetClick10s -> GoodTweetClickDdgFetcher(SignalType.GoodTweetClick10s), - SignalType.GoodTweetClick30s -> GoodTweetClickDdgFetcher(SignalType.GoodTweetClick30s), - SignalType.NegativeEngagedTweetId -> negativeEngagedTweetFetcher, - SignalType.NegativeEngagedUserId -> negativeEngagedUserFetcher, - SignalType.NotificationOpenAndClickV1 -> notificationOpenAndClickFetcher, - SignalType.OriginalTweet -> originalTweetsFetcher, - SignalType.OriginalTweet90dV2 -> originalTweetsFetcher, - SignalType.RealGraphOon -> realGraphOonFilteredFetcher, - SignalType.RepeatedProfileVisit14dMinVisit2V1 -> MemcachedProfileVisitsFetcher, - SignalType.RepeatedProfileVisit14dMinVisit2V1NoNegative -> FilteredSignalFetcherController( - MemcachedProfileVisitsFetcher, - SignalType.RepeatedProfileVisit14dMinVisit2V1NoNegative, - statsReceiver, - timer, - Map( - SignalType.AccountMute -> accountMutesFetcher, - SignalType.AccountBlock -> accountBlocksFetcher) - ), - SignalType.RepeatedProfileVisit90dMinVisit6V1 -> MemcachedProfileVisitsFetcher, - SignalType.RepeatedProfileVisit90dMinVisit6V1NoNegative -> FilteredSignalFetcherController( - MemcachedProfileVisitsFetcher, - SignalType.RepeatedProfileVisit90dMinVisit6V1NoNegative, - statsReceiver, - timer, - Map( - SignalType.AccountMute -> accountMutesFetcher, - SignalType.AccountBlock -> accountBlocksFetcher), - ), - SignalType.RepeatedProfileVisit180dMinVisit6V1 -> MemcachedProfileVisitsFetcher, - SignalType.RepeatedProfileVisit180dMinVisit6V1NoNegative -> FilteredSignalFetcherController( - MemcachedProfileVisitsFetcher, - SignalType.RepeatedProfileVisit180dMinVisit6V1NoNegative, - statsReceiver, - timer, - Map( - SignalType.AccountMute -> accountMutesFetcher, - SignalType.AccountBlock -> accountBlocksFetcher), - ), - SignalType.Reply -> replyTweetsFetcher, - SignalType.Reply90dV2 -> replyTweetsFetcher, - SignalType.Retweet -> retweetsFetcher, - SignalType.Retweet90dV2 -> retweetsFetcher, - SignalType.TweetFavorite -> tweetFavoritesFetcher, - SignalType.TweetFavorite90dV2 -> tweetFavoritesFetcher, - SignalType.TweetShareV1 -> tweetSharesFetcher, - SignalType.VideoView90dQualityV1 -> videoTweetsQualityViewFilteredFetcher, - SignalType.VideoView90dPlayback50V1 -> videoTweetsPlayback50FilteredFetcher, - /* Aggregated Signals */ - SignalType.ProducerBasedUnifiedEngagementWeightedSignal -> AggregatedSignalController( - uniformProducerSignalInfo, - uniformProducerSignalEngagementAggregation, - statsReceiver, - timer - ), - SignalType.TweetBasedUnifiedEngagementWeightedSignal -> AggregatedSignalController( - uniformTweetSignalInfo, - uniformTweetSignalEngagementAggregation, - statsReceiver, - timer - ), - SignalType.AdFavorite -> tweetFavoritesFetcher, - /* Negative Signals */ - SignalType.AccountBlock -> memcachedAccountBlocksFetcher, - SignalType.AccountMute -> memcachedAccountMutesFetcher, - SignalType.TweetDontLike -> negativeEngagedTweetFetcher, - SignalType.TweetReport -> negativeEngagedTweetFetcher, - SignalType.TweetSeeFewer -> negativeEngagedTweetFetcher, - ) - -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/handler/BUILD b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/handler/BUILD deleted file mode 100644 index 96dbbeeaf..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/handler/BUILD +++ /dev/null @@ -1,14 +0,0 @@ -scala_library( - compiler_option_sets = ["fatal_warnings"], - tags = ["bazel-compatible"], - dependencies = [ - "src/scala/com/twitter/twistly/common", - "src/scala/com/twitter/twistly/store", - "src/thrift/com/twitter/simclusters_v2:simclusters_v2-thrift-scala", - "strato/src/main/scala/com/twitter/strato/fed", - "strato/src/main/scala/com/twitter/strato/fed/server", - "user-signal-service/server/src/main/scala/com/twitter/usersignalservice/config", - "user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module", - "user-signal-service/thrift/src/main/thrift:thrift-scala", - ], -) diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/handler/UserSignalHandler.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/handler/UserSignalHandler.scala deleted file mode 100644 index 6fea51c4c..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/handler/UserSignalHandler.scala +++ /dev/null @@ -1,71 +0,0 @@ -package com.twitter.usersignalservice.handler - -import com.twitter.storehaus.ReadableStore -import com.twitter.usersignalservice.thriftscala.BatchSignalRequest -import com.twitter.usersignalservice.thriftscala.BatchSignalResponse -import com.twitter.util.Future -import com.twitter.conversions.DurationOps._ -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.frigate.common.util.StatsUtil -import com.twitter.usersignalservice.config.SignalFetcherConfig -import com.twitter.usersignalservice.base.Query -import com.twitter.usersignalservice.thriftscala.ClientIdentifier -import com.twitter.usersignalservice.thriftscala.SignalType -import com.twitter.util.Duration -import com.twitter.util.Timer -import com.twitter.util.TimeoutException - -class UserSignalHandler( - signalFetcherConfig: SignalFetcherConfig, - timer: Timer, - stats: StatsReceiver) { - import UserSignalHandler._ - - val statsReceiver: StatsReceiver = stats.scope("user-signal-service/service") - - def getBatchSignalsResponse(request: BatchSignalRequest): Future[Option[BatchSignalResponse]] = { - StatsUtil.trackOptionStats(statsReceiver) { - val allSignals = request.signalRequest.map { signalRequest => - signalFetcherConfig - .SignalFetcherMapper(signalRequest.signalType) - .get(Query( - userId = request.userId, - signalType = signalRequest.signalType, - maxResults = signalRequest.maxResults.map(_.toInt), - clientId = request.clientId.getOrElse(ClientIdentifier.Unknown) - )) - .map(signalOpt => (signalRequest.signalType, signalOpt)) - } - - Future.collect(allSignals).map { signalsSeq => - val signalsMap = signalsSeq.map { - case (signalType: SignalType, signalsOpt) => - (signalType, signalsOpt.getOrElse(EmptySeq)) - }.toMap - Some(BatchSignalResponse(signalsMap)) - } - } - } - - def toReadableStore: ReadableStore[BatchSignalRequest, BatchSignalResponse] = { - new ReadableStore[BatchSignalRequest, BatchSignalResponse] { - override def get(request: BatchSignalRequest): Future[Option[BatchSignalResponse]] = { - getBatchSignalsResponse(request).raiseWithin(UserSignalServiceTimeout)(timer).rescue { - case _: TimeoutException => - statsReceiver.counter("endpointGetBatchSignals/failure/timeout").incr() - EmptyResponse - case e => - statsReceiver.counter("endpointGetBatchSignals/failure/" + e.getClass.getName).incr() - EmptyResponse - } - } - } - } -} - -object UserSignalHandler { - val UserSignalServiceTimeout: Duration = 25.milliseconds - - val EmptySeq: Seq[Nothing] = Seq.empty - val EmptyResponse: Future[Option[BatchSignalResponse]] = Future.value(Some(BatchSignalResponse())) -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/BUILD b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/BUILD deleted file mode 100644 index d8e1e6a49..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/BUILD +++ /dev/null @@ -1,25 +0,0 @@ -scala_library( - compiler_option_sets = ["fatal_warnings"], - tags = ["bazel-compatible"], - dependencies = [ - "finagle-internal/mtls/src/main/scala/com/twitter/finagle/mtls/authentication", - "finagle-internal/mtls/src/main/scala/com/twitter/finagle/mtls/client", - "finagle/finagle-core/src/main", - "finagle/finagle-stats", - "finagle/finagle-thrift/src/main/scala", - "hermit/hermit-core/src/main/scala/com/twitter/hermit/predicate/socialgraph", - "relevance-platform/src/main/scala/com/twitter/relevance_platform/common/injection", - "servo/service/src/main/scala", - "src/scala/com/twitter/storehaus_internal/manhattan2", - "src/scala/com/twitter/storehaus_internal/memcache", - "src/scala/com/twitter/storehaus_internal/util", - "src/scala/com/twitter/twistly/common", - "src/scala/com/twitter/twistly/store", - "src/thrift/com/twitter/socialgraph:thrift-scala", - "stitch/stitch-storehaus", - "strato/src/main/scala/com/twitter/strato/fed", - "strato/src/main/scala/com/twitter/strato/fed/server", - "util/util-core:scala", - "util/util-stats/src/main/scala", - ], -) diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/CacheModule.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/CacheModule.scala deleted file mode 100644 index 38427b6ce..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/CacheModule.scala +++ /dev/null @@ -1,34 +0,0 @@ -package com.twitter.usersignalservice.module - -import com.google.inject.Provides -import javax.inject.Singleton -import com.twitter.finagle.memcached.Client -import com.twitter.finagle.mtls.authentication.ServiceIdentifier -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.inject.TwitterModule -import com.twitter.conversions.DurationOps._ -import com.twitter.storehaus_internal.memcache.MemcacheStore -import com.twitter.storehaus_internal.util.ZkEndPoint -import com.twitter.storehaus_internal.util.ClientName - -object CacheModule extends TwitterModule { - private val cacheDest = - flag[String](name = "cache_module.dest", help = "Path to memcache service") - private val timeout = - flag[Int](name = "memcache.timeout", help = "Memcache client timeout") - - @Singleton - @Provides - def providesCache( - serviceIdentifier: ServiceIdentifier, - stats: StatsReceiver - ): Client = - MemcacheStore.memcachedClient( - name = ClientName("memcache_user_signal_service"), - dest = ZkEndPoint(cacheDest()), - timeout = timeout().milliseconds, - retries = 0, - statsReceiver = stats.scope("memcache"), - serviceIdentifier = serviceIdentifier - ) -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/MHMtlsParamsModule.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/MHMtlsParamsModule.scala deleted file mode 100644 index 1ff1a7c5d..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/MHMtlsParamsModule.scala +++ /dev/null @@ -1,18 +0,0 @@ -package com.twitter.usersignalservice.module - -import com.twitter.finagle.mtls.authentication.ServiceIdentifier -import com.twitter.inject.TwitterModule -import com.twitter.storage.client.manhattan.kv.ManhattanKVClientMtlsParams -import com.google.inject.Provides -import javax.inject.Singleton - -object MHMtlsParamsModule extends TwitterModule { - - @Singleton - @Provides - def providesManhattanMtlsParams( - serviceIdentifier: ServiceIdentifier - ): ManhattanKVClientMtlsParams = { - ManhattanKVClientMtlsParams(serviceIdentifier) - } -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/SocialGraphServiceClientModule.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/SocialGraphServiceClientModule.scala deleted file mode 100644 index 194730261..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/SocialGraphServiceClientModule.scala +++ /dev/null @@ -1,40 +0,0 @@ -package com.twitter.usersignalservice.module - -import com.twitter.inject.Injector -import com.twitter.conversions.DurationOps._ -import com.twitter.finagle._ -import com.twitter.finagle.mux.ClientDiscardedRequestException -import com.twitter.finagle.service.ReqRep -import com.twitter.finagle.service.ResponseClass -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.finatra.mtls.thriftmux.modules.MtlsClient -import com.twitter.inject.thrift.modules.ThriftMethodBuilderClientModule -import com.twitter.util.Duration -import com.twitter.util.Throw -import com.twitter.socialgraph.thriftscala.SocialGraphService - -object SocialGraphServiceClientModule - extends ThriftMethodBuilderClientModule[ - SocialGraphService.ServicePerEndpoint, - SocialGraphService.MethodPerEndpoint - ] - with MtlsClient { - override val label = "socialgraph" - override val dest = "/s/socialgraph/socialgraph" - override val requestTimeout: Duration = 30.milliseconds - - override def configureThriftMuxClient( - injector: Injector, - client: ThriftMux.Client - ): ThriftMux.Client = { - super - .configureThriftMuxClient(injector, client) - .withStatsReceiver(injector.instance[StatsReceiver].scope("clnt")) - .withSessionQualifier - .successRateFailureAccrual(successRate = 0.9, window = 30.seconds) - .withResponseClassifier { - case ReqRep(_, Throw(_: ClientDiscardedRequestException)) => ResponseClass.Ignorable - } - } - -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/TimerModule.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/TimerModule.scala deleted file mode 100644 index ffe26f8c4..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/TimerModule.scala +++ /dev/null @@ -1,12 +0,0 @@ -package com.twitter.usersignalservice.module -import com.google.inject.Provides -import com.twitter.finagle.util.DefaultTimer -import com.twitter.inject.TwitterModule -import com.twitter.util.Timer -import javax.inject.Singleton - -object TimerModule extends TwitterModule { - @Singleton - @Provides - def providesTimer: Timer = DefaultTimer -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/service/BUILD b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/service/BUILD deleted file mode 100644 index d1cd4e3a3..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/service/BUILD +++ /dev/null @@ -1,13 +0,0 @@ -scala_library( - compiler_option_sets = ["fatal_warnings"], - tags = ["bazel-compatible"], - dependencies = [ - "stitch/stitch-storehaus", - "strato/src/main/scala/com/twitter/strato/fed", - "strato/src/main/scala/com/twitter/strato/fed/server", - "user-signal-service/server/src/main/scala/com/twitter/usersignalservice/config", - "user-signal-service/server/src/main/scala/com/twitter/usersignalservice/handler", - "user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module", - "user-signal-service/thrift/src/main/thrift:thrift-scala", - ], -) diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/service/UserSignalService.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/service/UserSignalService.scala deleted file mode 100644 index 92d956001..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/service/UserSignalService.scala +++ /dev/null @@ -1,26 +0,0 @@ -package com.twitter.usersignalservice -package service - -import com.google.inject.Inject -import com.google.inject.Singleton -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.stitch.storehaus.StitchOfReadableStore -import com.twitter.usersignalservice.config.SignalFetcherConfig -import com.twitter.usersignalservice.handler.UserSignalHandler -import com.twitter.usersignalservice.thriftscala.BatchSignalRequest -import com.twitter.usersignalservice.thriftscala.BatchSignalResponse -import com.twitter.util.Timer - -@Singleton -class UserSignalService @Inject() ( - signalFetcherConfig: SignalFetcherConfig, - timer: Timer, - stats: StatsReceiver) { - - private val userSignalHandler = - new UserSignalHandler(signalFetcherConfig, timer, stats) - - val userSignalServiceHandlerStoreStitch: BatchSignalRequest => com.twitter.stitch.Stitch[ - BatchSignalResponse - ] = StitchOfReadableStore(userSignalHandler.toReadableStore) -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/AccountBlocksFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/AccountBlocksFetcher.scala deleted file mode 100644 index a72348b7b..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/AccountBlocksFetcher.scala +++ /dev/null @@ -1,40 +0,0 @@ -package com.twitter.usersignalservice.signals - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.socialgraph.thriftscala.RelationshipType -import com.twitter.socialgraph.thriftscala.SocialGraphService -import com.twitter.twistly.common.UserId -import com.twitter.usersignalservice.base.BaseSignalFetcher -import com.twitter.usersignalservice.base.Query -import com.twitter.usersignalservice.signals.common.SGSUtils -import com.twitter.usersignalservice.thriftscala.Signal -import com.twitter.usersignalservice.thriftscala.SignalType -import com.twitter.util.Future -import com.twitter.util.Timer -import javax.inject.Inject -import javax.inject.Singleton - -@Singleton -case class AccountBlocksFetcher @Inject() ( - sgsClient: SocialGraphService.MethodPerEndpoint, - timer: Timer, - stats: StatsReceiver) - extends BaseSignalFetcher { - - override type RawSignalType = Signal - override val name: String = this.getClass.getCanonicalName - override val statsReceiver: StatsReceiver = stats.scope(this.name) - - override def getRawSignals( - userId: UserId - ): Future[Option[Seq[RawSignalType]]] = { - SGSUtils.getSGSRawSignals(userId, sgsClient, RelationshipType.Blocking, SignalType.AccountBlock) - } - - override def process( - query: Query, - rawSignals: Future[Option[Seq[RawSignalType]]] - ): Future[Option[Seq[Signal]]] = { - rawSignals.map(_.map(_.take(query.maxResults.getOrElse(Int.MaxValue)))) - } -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/AccountFollowsFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/AccountFollowsFetcher.scala deleted file mode 100644 index 60cc2bbd7..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/AccountFollowsFetcher.scala +++ /dev/null @@ -1,44 +0,0 @@ -package com.twitter.usersignalservice.signals - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.socialgraph.thriftscala.RelationshipType -import com.twitter.socialgraph.thriftscala.SocialGraphService -import com.twitter.twistly.common.UserId -import com.twitter.usersignalservice.base.BaseSignalFetcher -import com.twitter.usersignalservice.base.Query -import com.twitter.usersignalservice.signals.common.SGSUtils -import com.twitter.usersignalservice.thriftscala.Signal -import com.twitter.usersignalservice.thriftscala.SignalType -import com.twitter.util.Future -import com.twitter.util.Timer -import javax.inject.Inject -import javax.inject.Singleton - -@Singleton -case class AccountFollowsFetcher @Inject() ( - sgsClient: SocialGraphService.MethodPerEndpoint, - timer: Timer, - stats: StatsReceiver) - extends BaseSignalFetcher { - - override type RawSignalType = Signal - override val name: String = this.getClass.getCanonicalName - override val statsReceiver: StatsReceiver = stats.scope(this.name) - - override def getRawSignals( - userId: UserId - ): Future[Option[Seq[RawSignalType]]] = { - SGSUtils.getSGSRawSignals( - userId, - sgsClient, - RelationshipType.Following, - SignalType.AccountFollow) - } - - override def process( - query: Query, - rawSignals: Future[Option[Seq[RawSignalType]]] - ): Future[Option[Seq[Signal]]] = { - rawSignals.map(_.map(_.take(query.maxResults.getOrElse(Int.MaxValue)))) - } -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/AccountMutesFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/AccountMutesFetcher.scala deleted file mode 100644 index 27eb0a36d..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/AccountMutesFetcher.scala +++ /dev/null @@ -1,40 +0,0 @@ -package com.twitter.usersignalservice.signals - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.socialgraph.thriftscala.RelationshipType -import com.twitter.socialgraph.thriftscala.SocialGraphService -import com.twitter.twistly.common.UserId -import com.twitter.usersignalservice.base.BaseSignalFetcher -import com.twitter.usersignalservice.base.Query -import com.twitter.usersignalservice.signals.common.SGSUtils -import com.twitter.usersignalservice.thriftscala.Signal -import com.twitter.usersignalservice.thriftscala.SignalType -import com.twitter.util.Future -import com.twitter.util.Timer -import javax.inject.Inject -import javax.inject.Singleton - -@Singleton -case class AccountMutesFetcher @Inject() ( - sgsClient: SocialGraphService.MethodPerEndpoint, - timer: Timer, - stats: StatsReceiver) - extends BaseSignalFetcher { - - override type RawSignalType = Signal - override val name: String = this.getClass.getCanonicalName - override val statsReceiver: StatsReceiver = stats.scope(this.name) - - override def getRawSignals( - userId: UserId - ): Future[Option[Seq[RawSignalType]]] = { - SGSUtils.getSGSRawSignals(userId, sgsClient, RelationshipType.Muting, SignalType.AccountMute) - } - - override def process( - query: Query, - rawSignals: Future[Option[Seq[RawSignalType]]] - ): Future[Option[Seq[Signal]]] = { - rawSignals.map(_.map(_.take(query.maxResults.getOrElse(Int.MaxValue)))) - } -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/BUILD b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/BUILD deleted file mode 100644 index 50380a581..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/BUILD +++ /dev/null @@ -1,34 +0,0 @@ -scala_library( - compiler_option_sets = ["fatal_warnings"], - tags = ["bazel-compatible"], - dependencies = [ - "3rdparty/jvm/com/twitter/bijection:scrooge", - "3rdparty/jvm/javax/inject:javax.inject", - "3rdparty/src/jvm/com/twitter/storehaus:core", - "discovery-ds/src/main/thrift/com/twitter/dds/jobs/repeated_profile_visits:profile_visit-scala", - "flock-client/src/main/thrift:thrift-scala", - "frigate/frigate-common/src/main/scala/com/twitter/frigate/common/store/strato", - "hermit/hermit-core/src/main/scala/com/twitter/hermit/predicate/socialgraph", - "src/scala/com/twitter/scalding_internal/job", - "src/scala/com/twitter/simclusters_v2/common", - "src/scala/com/twitter/storehaus_internal/manhattan", - "src/scala/com/twitter/storehaus_internal/manhattan/config", - "src/scala/com/twitter/storehaus_internal/manhattan2", - "src/scala/com/twitter/storehaus_internal/offline", - "src/scala/com/twitter/storehaus_internal/util", - "src/scala/com/twitter/twistly/common", - "src/thrift/com/twitter/experiments/general_metrics:general_metrics-scala", - "src/thrift/com/twitter/frigate/data_pipeline:frigate-user-history-thrift-scala", - "src/thrift/com/twitter/onboarding/relevance/tweet_engagement:tweet_engagement-scala", - "src/thrift/com/twitter/simclusters_v2:simclusters_v2-thrift-scala", - "src/thrift/com/twitter/socialgraph:thrift-scala", - "src/thrift/com/twitter/traffic_attribution:traffic_attribution-scala", - "strato/src/main/scala/com/twitter/strato/client", - "user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base", - "user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/common", - "user-signal-service/thrift/src/main/thrift:thrift-scala", - "util/util-core:util-core-util", - "util/util-core/src/main/java/com/twitter/util", - "util/util-stats/src/main/scala/com/twitter/finagle/stats", - ], -) diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/NegativeEngagedTweetFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/NegativeEngagedTweetFetcher.scala deleted file mode 100644 index 22c0b0852..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/NegativeEngagedTweetFetcher.scala +++ /dev/null @@ -1,97 +0,0 @@ -package com.twitter.usersignalservice.signals - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.simclusters_v2.common.UserId -import com.twitter.simclusters_v2.thriftscala.InternalId -import com.twitter.strato.client.Client -import com.twitter.strato.data.Conv -import com.twitter.strato.thrift.ScroogeConv -import com.twitter.twistly.thriftscala.RecentNegativeEngagedTweet -import com.twitter.twistly.thriftscala.TweetNegativeEngagementType -import com.twitter.twistly.thriftscala.UserRecentNegativeEngagedTweets -import com.twitter.usersignalservice.base.Query -import com.twitter.usersignalservice.base.StratoSignalFetcher -import com.twitter.usersignalservice.thriftscala.Signal -import com.twitter.usersignalservice.thriftscala.SignalType -import com.twitter.util.Future -import com.twitter.util.Timer -import javax.inject.Inject -import javax.inject.Singleton - -@Singleton -case class NegativeEngagedTweetFetcher @Inject() ( - stratoClient: Client, - timer: Timer, - stats: StatsReceiver) - extends StratoSignalFetcher[(UserId, Long), Unit, UserRecentNegativeEngagedTweets] { - - import NegativeEngagedTweetFetcher._ - - override type RawSignalType = RecentNegativeEngagedTweet - override val name: String = this.getClass.getCanonicalName - override val statsReceiver: StatsReceiver = stats.scope(name) - - override val stratoColumnPath: String = stratoPath - override val stratoView: Unit = None - - override protected val keyConv: Conv[(UserId, Long)] = Conv.ofType - override protected val viewConv: Conv[Unit] = Conv.ofType - override protected val valueConv: Conv[UserRecentNegativeEngagedTweets] = - ScroogeConv.fromStruct[UserRecentNegativeEngagedTweets] - - override protected def toStratoKey(userId: UserId): (UserId, Long) = (userId, defaultVersion) - - override protected def toRawSignals( - stratoValue: UserRecentNegativeEngagedTweets - ): Seq[RecentNegativeEngagedTweet] = { - stratoValue.recentNegativeEngagedTweets - } - - override def process( - query: Query, - rawSignals: Future[Option[Seq[RecentNegativeEngagedTweet]]] - ): Future[Option[Seq[Signal]]] = { - rawSignals.map { - _.map { signals => - signals - .filter(signal => negativeEngagedTweetTypeFilter(query.signalType, signal)) - .map { signal => - Signal( - query.signalType, - signal.engagedAt, - Some(InternalId.TweetId(signal.tweetId)) - ) - } - .groupBy(_.targetInternalId) // groupBy if there's duplicated authorIds - .mapValues(_.maxBy(_.timestamp)) - .values - .toSeq - .sortBy(-_.timestamp) - .take(query.maxResults.getOrElse(Int.MaxValue)) - } - } - } -} - -object NegativeEngagedTweetFetcher { - - val stratoPath = "recommendations/twistly/userRecentNegativeEngagedTweets" - private val defaultVersion = 0L - - private def negativeEngagedTweetTypeFilter( - signalType: SignalType, - signal: RecentNegativeEngagedTweet - ): Boolean = { - signalType match { - case SignalType.TweetDontLike => - signal.engagementType == TweetNegativeEngagementType.DontLike - case SignalType.TweetSeeFewer => - signal.engagementType == TweetNegativeEngagementType.SeeFewer - case SignalType.TweetReport => - signal.engagementType == TweetNegativeEngagementType.ReportClick - case SignalType.NegativeEngagedTweetId => true - case _ => false - } - } - -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/NegativeEngagedUserFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/NegativeEngagedUserFetcher.scala deleted file mode 100644 index c07f61f91..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/NegativeEngagedUserFetcher.scala +++ /dev/null @@ -1,79 +0,0 @@ -package com.twitter.usersignalservice.signals - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.simclusters_v2.common.UserId -import com.twitter.simclusters_v2.thriftscala.InternalId -import com.twitter.strato.client.Client -import com.twitter.strato.data.Conv -import com.twitter.strato.thrift.ScroogeConv -import com.twitter.twistly.thriftscala.RecentNegativeEngagedTweet -import com.twitter.twistly.thriftscala.UserRecentNegativeEngagedTweets -import com.twitter.usersignalservice.base.Query -import com.twitter.usersignalservice.base.StratoSignalFetcher -import com.twitter.usersignalservice.thriftscala.Signal -import com.twitter.usersignalservice.thriftscala.SignalType -import com.twitter.util.Future -import com.twitter.util.Timer -import javax.inject.Inject -import javax.inject.Singleton - -@Singleton -case class NegativeEngagedUserFetcher @Inject() ( - stratoClient: Client, - timer: Timer, - stats: StatsReceiver) - extends StratoSignalFetcher[(UserId, Long), Unit, UserRecentNegativeEngagedTweets] { - - import NegativeEngagedUserFetcher._ - - override type RawSignalType = RecentNegativeEngagedTweet - override val name: String = this.getClass.getCanonicalName - override val statsReceiver: StatsReceiver = stats.scope(name) - - override val stratoColumnPath: String = stratoPath - override val stratoView: Unit = None - - override protected val keyConv: Conv[(UserId, Long)] = Conv.ofType - override protected val viewConv: Conv[Unit] = Conv.ofType - override protected val valueConv: Conv[UserRecentNegativeEngagedTweets] = - ScroogeConv.fromStruct[UserRecentNegativeEngagedTweets] - - override protected def toStratoKey(userId: UserId): (UserId, Long) = (userId, defaultVersion) - - override protected def toRawSignals( - stratoValue: UserRecentNegativeEngagedTweets - ): Seq[RecentNegativeEngagedTweet] = { - stratoValue.recentNegativeEngagedTweets - } - - override def process( - query: Query, - rawSignals: Future[Option[Seq[RecentNegativeEngagedTweet]]] - ): Future[Option[Seq[Signal]]] = { - rawSignals.map { - _.map { signals => - signals - .map { e => - Signal( - defaultNegativeSignalType, - e.engagedAt, - Some(InternalId.UserId(e.authorId)) - ) - } - .groupBy(_.targetInternalId) // groupBy if there's duplicated authorIds - .mapValues(_.maxBy(_.timestamp)) - .values - .toSeq - .sortBy(-_.timestamp) - } - } - } -} - -object NegativeEngagedUserFetcher { - - val stratoPath = "recommendations/twistly/userRecentNegativeEngagedTweets" - private val defaultVersion = 0L - private val defaultNegativeSignalType = SignalType.NegativeEngagedUserId - -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/NotificationOpenAndClickFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/NotificationOpenAndClickFetcher.scala deleted file mode 100644 index 5c40ec6a8..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/NotificationOpenAndClickFetcher.scala +++ /dev/null @@ -1,145 +0,0 @@ -package com.twitter.usersignalservice.signals - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.frigate.common.store.strato.StratoFetchableStore -import com.twitter.frigate.data_pipeline.candidate_generation.thriftscala.ClientEngagementEvent -import com.twitter.frigate.data_pipeline.candidate_generation.thriftscala.LatestEvents -import com.twitter.frigate.data_pipeline.candidate_generation.thriftscala.LatestNegativeEngagementEvents -import com.twitter.simclusters_v2.thriftscala.InternalId -import com.twitter.storehaus.ReadableStore -import com.twitter.strato.client.Client -import com.twitter.twistly.common.TweetId -import com.twitter.twistly.common.UserId -import com.twitter.usersignalservice.base.BaseSignalFetcher -import com.twitter.usersignalservice.base.Query -import com.twitter.usersignalservice.thriftscala.Signal -import com.twitter.usersignalservice.thriftscala.SignalType -import com.twitter.util.Future -import com.twitter.util.Timer -import javax.inject.Inject -import javax.inject.Singleton - -@Singleton -case class NotificationOpenAndClickFetcher @Inject() ( - stratoClient: Client, - timer: Timer, - stats: StatsReceiver) - extends BaseSignalFetcher { - import NotificationOpenAndClickFetcher._ - - override type RawSignalType = ClientEngagementEvent - override val name: String = this.getClass.getCanonicalName - override val statsReceiver: StatsReceiver = stats.scope(this.name) - - private val latestEventsStore: ReadableStore[UserId, LatestEvents] = { - StratoFetchableStore - .withUnitView[UserId, LatestEvents](stratoClient, latestEventStoreColumn) - } - - private val notificationNegativeEngagementStore: ReadableStore[UserId, Seq[ - NotificationNegativeEngagement - ]] = { - StratoFetchableStore - .withUnitView[UserId, LatestNegativeEngagementEvents]( - stratoClient, - labeledPushRecsNegativeEngagementsColumn).mapValues(fromLatestNegativeEngagementEvents) - } - - override def getRawSignals( - userId: UserId - ): Future[Option[Seq[RawSignalType]]] = { - val notificationNegativeEngagementEventsFut = - notificationNegativeEngagementStore.get(userId) - val latestEventsFut = latestEventsStore.get(userId) - - Future - .join(latestEventsFut, notificationNegativeEngagementEventsFut).map { - case (latestEventsOpt, latestNegativeEngagementEventsOpt) => - latestEventsOpt.map { latestEvents => - // Negative Engagement Events Filter - filterNegativeEngagementEvents( - latestEvents.engagementEvents, - latestNegativeEngagementEventsOpt.getOrElse(Seq.empty), - statsReceiver.scope("filterNegativeEngagementEvents")) - } - } - } - - override def process( - query: Query, - rawSignals: Future[Option[Seq[RawSignalType]]] - ): Future[Option[Seq[Signal]]] = { - rawSignals.map { - _.map { - _.take(query.maxResults.getOrElse(Int.MaxValue)).map { clientEngagementEvent => - Signal( - SignalType.NotificationOpenAndClickV1, - timestamp = clientEngagementEvent.timestampMillis, - targetInternalId = Some(InternalId.TweetId(clientEngagementEvent.tweetId)) - ) - } - } - } - } -} - -object NotificationOpenAndClickFetcher { - private val latestEventStoreColumn = "frigate/magicrecs/labeledPushRecsAggregated.User" - private val labeledPushRecsNegativeEngagementsColumn = - "frigate/magicrecs/labeledPushRecsNegativeEngagements.User" - - case class NotificationNegativeEngagement( - tweetId: TweetId, - timestampMillis: Long, - isNtabDisliked: Boolean, - isReportTweetClicked: Boolean, - isReportTweetDone: Boolean, - isReportUserClicked: Boolean, - isReportUserDone: Boolean) - - def fromLatestNegativeEngagementEvents( - latestNegativeEngagementEvents: LatestNegativeEngagementEvents - ): Seq[NotificationNegativeEngagement] = { - latestNegativeEngagementEvents.negativeEngagementEvents.map { event => - NotificationNegativeEngagement( - event.tweetId, - event.timestampMillis, - event.isNtabDisliked.getOrElse(false), - event.isReportTweetClicked.getOrElse(false), - event.isReportTweetDone.getOrElse(false), - event.isReportUserClicked.getOrElse(false), - event.isReportUserDone.getOrElse(false) - ) - } - } - - private def filterNegativeEngagementEvents( - engagementEvents: Seq[ClientEngagementEvent], - negativeEvents: Seq[NotificationNegativeEngagement], - statsReceiver: StatsReceiver - ): Seq[ClientEngagementEvent] = { - if (negativeEvents.nonEmpty) { - statsReceiver.counter("filterNegativeEngagementEvents").incr() - statsReceiver.stat("eventSizeBeforeFilter").add(engagementEvents.size) - - val negativeEngagementIdSet = - negativeEvents.collect { - case event - if event.isNtabDisliked || event.isReportTweetClicked || event.isReportTweetDone || event.isReportUserClicked || event.isReportUserDone => - event.tweetId - }.toSet - - // negative event size - statsReceiver.stat("negativeEventsSize").add(negativeEngagementIdSet.size) - - // filter out negative engagement sources - val result = engagementEvents.filterNot { event => - negativeEngagementIdSet.contains(event.tweetId) - } - - statsReceiver.stat("eventSizeAfterFilter").add(result.size) - - result - } else engagementEvents - } -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/OriginalTweetsFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/OriginalTweetsFetcher.scala deleted file mode 100644 index 46d5b8f9c..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/OriginalTweetsFetcher.scala +++ /dev/null @@ -1,70 +0,0 @@ -package com.twitter.usersignalservice.signals - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.simclusters_v2.common.UserId -import com.twitter.simclusters_v2.thriftscala.InternalId -import com.twitter.strato.client.Client -import com.twitter.strato.data.Conv -import com.twitter.strato.thrift.ScroogeConv -import com.twitter.twistly.common.TwistlyProfile -import com.twitter.twistly.thriftscala.EngagementMetadata.OriginalTweetMetadata -import com.twitter.twistly.thriftscala.RecentEngagedTweet -import com.twitter.twistly.thriftscala.UserRecentEngagedTweets -import com.twitter.usersignalservice.base.Query -import com.twitter.usersignalservice.base.StratoSignalFetcher -import com.twitter.usersignalservice.thriftscala.Signal -import com.twitter.util.Future -import com.twitter.util.Timer -import javax.inject.Inject -import javax.inject.Singleton - -@Singleton -case class OriginalTweetsFetcher @Inject() ( - stratoClient: Client, - timer: Timer, - stats: StatsReceiver) - extends StratoSignalFetcher[(UserId, Long), Unit, UserRecentEngagedTweets] { - import OriginalTweetsFetcher._ - override type RawSignalType = RecentEngagedTweet - override val name: String = this.getClass.getCanonicalName - override val statsReceiver: StatsReceiver = stats.scope(name) - - override val stratoColumnPath: String = - TwistlyProfile.TwistlyProdProfile.userRecentEngagedStorePath - override val stratoView: Unit = None - - override protected val keyConv: Conv[(UserId, Long)] = Conv.ofType - override protected val viewConv: Conv[Unit] = Conv.ofType - override protected val valueConv: Conv[UserRecentEngagedTweets] = - ScroogeConv.fromStruct[UserRecentEngagedTweets] - - override protected def toStratoKey(userId: UserId): (UserId, Long) = (userId, DefaultVersion) - - override protected def toRawSignals( - userRecentEngagedTweets: UserRecentEngagedTweets - ): Seq[RawSignalType] = - userRecentEngagedTweets.recentEngagedTweets - - override def process( - query: Query, - rawSignals: Future[Option[Seq[RawSignalType]]] - ): Future[Option[Seq[Signal]]] = { - rawSignals.map { - _.map { signals => - val lookBackWindowFilteredSignals = - SignalFilter.lookBackWindow90DayFilter(signals, query.signalType) - lookBackWindowFilteredSignals - .collect { - case RecentEngagedTweet(tweetId, engagedAt, _: OriginalTweetMetadata, _) => - Signal(query.signalType, engagedAt, Some(InternalId.TweetId(tweetId))) - }.take(query.maxResults.getOrElse(Int.MaxValue)) - } - } - } - -} - -object OriginalTweetsFetcher { - // see com.twitter.twistly.store.UserRecentEngagedTweetsStore - private val DefaultVersion = 0 -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/ProfileClickFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/ProfileClickFetcher.scala deleted file mode 100644 index 1b93df59d..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/ProfileClickFetcher.scala +++ /dev/null @@ -1,98 +0,0 @@ -package com.twitter.usersignalservice.signals - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.simclusters_v2.common.UserId -import com.twitter.simclusters_v2.thriftscala.InternalId -import com.twitter.strato.client.Client -import com.twitter.strato.data.Conv -import com.twitter.strato.thrift.ScroogeConv -import com.twitter.twistly.thriftscala.RecentProfileClickImpressEvents -import com.twitter.twistly.thriftscala.ProfileClickImpressEvent -import com.twitter.usersignalservice.base.Query -import com.twitter.usersignalservice.base.StratoSignalFetcher -import com.twitter.usersignalservice.thriftscala.Signal -import com.twitter.usersignalservice.thriftscala.SignalType -import com.twitter.util.Future -import com.twitter.util.Timer -import javax.inject.Inject -import javax.inject.Singleton - -@Singleton -case class ProfileClickFetcher @Inject() ( - stratoClient: Client, - timer: Timer, - stats: StatsReceiver) - extends StratoSignalFetcher[(UserId, Long), Unit, RecentProfileClickImpressEvents] { - - import ProfileClickFetcher._ - - override type RawSignalType = ProfileClickImpressEvent - override val name: String = this.getClass.getCanonicalName - override val statsReceiver: StatsReceiver = stats.scope(name) - - override val stratoColumnPath: String = stratoPath - override val stratoView: Unit = None - - override protected val keyConv: Conv[(UserId, Long)] = Conv.ofType - override protected val viewConv: Conv[Unit] = Conv.ofType - override protected val valueConv: Conv[RecentProfileClickImpressEvents] = - ScroogeConv.fromStruct[RecentProfileClickImpressEvents] - - override protected def toStratoKey(userId: UserId): (UserId, Long) = (userId, defaultVersion) - - override protected def toRawSignals( - stratoValue: RecentProfileClickImpressEvents - ): Seq[ProfileClickImpressEvent] = { - stratoValue.events - } - - override def process( - query: Query, - rawSignals: Future[Option[Seq[ProfileClickImpressEvent]]] - ): Future[Option[Seq[Signal]]] = { - rawSignals.map { events => - events - .map { clicks => - clicks - .filter(dwelltimeFilter(_, query.signalType)) - .map(signalFromProfileClick(_, query.signalType)) - .sortBy(-_.timestamp) - .take(query.maxResults.getOrElse(Int.MaxValue)) - } - } - } -} - -object ProfileClickFetcher { - - val stratoPath = "recommendations/twistly/userRecentProfileClickImpress" - private val defaultVersion = 0L - private val sec2millis: Int => Long = i => i * 1000L - private val minDwellTimeMap: Map[SignalType, Long] = Map( - SignalType.GoodProfileClick -> sec2millis(10), - SignalType.GoodProfileClick20s -> sec2millis(20), - SignalType.GoodProfileClick30s -> sec2millis(30), - SignalType.GoodProfileClickFiltered -> sec2millis(10), - SignalType.GoodProfileClick20sFiltered -> sec2millis(20), - SignalType.GoodProfileClick30sFiltered -> sec2millis(30), - ) - - def signalFromProfileClick( - profileClickImpressEvent: ProfileClickImpressEvent, - signalType: SignalType - ): Signal = { - Signal( - signalType, - profileClickImpressEvent.engagedAt, - Some(InternalId.UserId(profileClickImpressEvent.entityId)) - ) - } - - def dwelltimeFilter( - profileClickImpressEvent: ProfileClickImpressEvent, - signalType: SignalType - ): Boolean = { - val goodClickDwellTime = minDwellTimeMap(signalType) - profileClickImpressEvent.clickImpressEventMetadata.totalDwellTime >= goodClickDwellTime - } -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/ProfileVisitsFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/ProfileVisitsFetcher.scala deleted file mode 100644 index 1cb27261f..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/ProfileVisitsFetcher.scala +++ /dev/null @@ -1,143 +0,0 @@ -package com.twitter.usersignalservice.signals - -import com.twitter.bijection.Codec -import com.twitter.bijection.scrooge.BinaryScalaCodec -import com.twitter.dds.jobs.repeated_profile_visits.thriftscala.ProfileVisitSet -import com.twitter.dds.jobs.repeated_profile_visits.thriftscala.ProfileVisitorInfo -import com.twitter.experiments.general_metrics.thriftscala.IdType -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.simclusters_v2.thriftscala.InternalId -import com.twitter.storage.client.manhattan.kv.ManhattanKVClientMtlsParams -import com.twitter.storehaus_internal.manhattan.Apollo -import com.twitter.storehaus_internal.manhattan.ManhattanCluster -import com.twitter.twistly.common.UserId -import com.twitter.usersignalservice.base.ManhattanSignalFetcher -import com.twitter.usersignalservice.base.Query -import com.twitter.usersignalservice.thriftscala.Signal -import com.twitter.usersignalservice.thriftscala.SignalType -import com.twitter.util.Future -import com.twitter.util.Timer -import javax.inject.Inject -import javax.inject.Singleton - -case class ProfileVisitMetadata( - targetId: Option[Long], - totalTargetVisitsInLast14Days: Option[Int], - totalTargetVisitsInLast90Days: Option[Int], - totalTargetVisitsInLast180Days: Option[Int], - latestTargetVisitTimestampInLast90Days: Option[Long]) - -@Singleton -case class ProfileVisitsFetcher @Inject() ( - manhattanKVClientMtlsParams: ManhattanKVClientMtlsParams, - timer: Timer, - stats: StatsReceiver) - extends ManhattanSignalFetcher[ProfileVisitorInfo, ProfileVisitSet] { - import ProfileVisitsFetcher._ - - override type RawSignalType = ProfileVisitMetadata - - override val manhattanAppId: String = MHAppId - override val manhattanDatasetName: String = MHDatasetName - override val manhattanClusterId: ManhattanCluster = Apollo - override val manhattanKeyCodec: Codec[ProfileVisitorInfo] = BinaryScalaCodec(ProfileVisitorInfo) - override val manhattanRawSignalCodec: Codec[ProfileVisitSet] = BinaryScalaCodec(ProfileVisitSet) - - override protected def toManhattanKey(userId: UserId): ProfileVisitorInfo = - ProfileVisitorInfo(userId, IdType.User) - - override protected def toRawSignals(manhattanValue: ProfileVisitSet): Seq[ProfileVisitMetadata] = - manhattanValue.profileVisitSet - .map { - _.collect { - // only keep the Non-NSFW and not-following profile visits - case profileVisit - if profileVisit.targetId.nonEmpty - // The below check covers 180 days, not only 90 days as the name implies. - // See comment on [[ProfileVisit.latestTargetVisitTimestampInLast90Days]] thrift. - && profileVisit.latestTargetVisitTimestampInLast90Days.nonEmpty - && !profileVisit.isTargetNSFW.getOrElse(false) - && !profileVisit.doesSourceIdFollowTargetId.getOrElse(false) => - ProfileVisitMetadata( - targetId = profileVisit.targetId, - totalTargetVisitsInLast14Days = profileVisit.totalTargetVisitsInLast14Days, - totalTargetVisitsInLast90Days = profileVisit.totalTargetVisitsInLast90Days, - totalTargetVisitsInLast180Days = profileVisit.totalTargetVisitsInLast180Days, - latestTargetVisitTimestampInLast90Days = - profileVisit.latestTargetVisitTimestampInLast90Days - ) - }.toSeq - }.getOrElse(Seq.empty) - - override val name: String = this.getClass.getCanonicalName - - override val statsReceiver: StatsReceiver = stats.scope(name) - - override def process( - query: Query, - rawSignals: Future[Option[Seq[ProfileVisitMetadata]]] - ): Future[Option[Seq[Signal]]] = rawSignals.map { profiles => - profiles - .map { - _.filter(profileVisitMetadata => visitCountFilter(profileVisitMetadata, query.signalType)) - .sortBy(profileVisitMetadata => - -visitCountMap(query.signalType)(profileVisitMetadata).getOrElse(0)) - .map(profileVisitMetadata => - signalFromProfileVisit(profileVisitMetadata, query.signalType)) - .take(query.maxResults.getOrElse(Int.MaxValue)) - } - } -} - -object ProfileVisitsFetcher { - private val MHAppId = "repeated_profile_visits_aggregated" - private val MHDatasetName = "repeated_profile_visits_aggregated" - - private val minVisitCountMap: Map[SignalType, Int] = Map( - SignalType.RepeatedProfileVisit14dMinVisit2V1 -> 2, - SignalType.RepeatedProfileVisit14dMinVisit2V1NoNegative -> 2, - SignalType.RepeatedProfileVisit90dMinVisit6V1 -> 6, - SignalType.RepeatedProfileVisit90dMinVisit6V1NoNegative -> 6, - SignalType.RepeatedProfileVisit180dMinVisit6V1 -> 6, - SignalType.RepeatedProfileVisit180dMinVisit6V1NoNegative -> 6 - ) - - private val visitCountMap: Map[SignalType, ProfileVisitMetadata => Option[Int]] = Map( - SignalType.RepeatedProfileVisit14dMinVisit2V1 -> - ((profileVisitMetadata: ProfileVisitMetadata) => - profileVisitMetadata.totalTargetVisitsInLast14Days), - SignalType.RepeatedProfileVisit14dMinVisit2V1NoNegative -> - ((profileVisitMetadata: ProfileVisitMetadata) => - profileVisitMetadata.totalTargetVisitsInLast14Days), - SignalType.RepeatedProfileVisit90dMinVisit6V1 -> - ((profileVisitMetadata: ProfileVisitMetadata) => - profileVisitMetadata.totalTargetVisitsInLast90Days), - SignalType.RepeatedProfileVisit90dMinVisit6V1NoNegative -> - ((profileVisitMetadata: ProfileVisitMetadata) => - profileVisitMetadata.totalTargetVisitsInLast90Days), - SignalType.RepeatedProfileVisit180dMinVisit6V1 -> - ((profileVisitMetadata: ProfileVisitMetadata) => - profileVisitMetadata.totalTargetVisitsInLast180Days), - SignalType.RepeatedProfileVisit180dMinVisit6V1NoNegative -> - ((profileVisitMetadata: ProfileVisitMetadata) => - profileVisitMetadata.totalTargetVisitsInLast180Days) - ) - - def signalFromProfileVisit( - profileVisitMetadata: ProfileVisitMetadata, - signalType: SignalType - ): Signal = { - Signal( - signalType, - profileVisitMetadata.latestTargetVisitTimestampInLast90Days.get, - profileVisitMetadata.targetId.map(targetId => InternalId.UserId(targetId)) - ) - } - - def visitCountFilter( - profileVisitMetadata: ProfileVisitMetadata, - signalType: SignalType - ): Boolean = { - visitCountMap(signalType)(profileVisitMetadata).exists(_ >= minVisitCountMap(signalType)) - } -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/RealGraphOonFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/RealGraphOonFetcher.scala deleted file mode 100644 index ad5cc4f4b..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/RealGraphOonFetcher.scala +++ /dev/null @@ -1,70 +0,0 @@ -package com.twitter.usersignalservice.signals - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.simclusters_v2.common.UserId -import com.twitter.simclusters_v2.thriftscala.InternalId -import com.twitter.strato.client.Client -import com.twitter.strato.data.Conv -import com.twitter.strato.thrift.ScroogeConv -import com.twitter.usersignalservice.base.Query -import com.twitter.wtf.candidate.thriftscala.CandidateSeq -import com.twitter.wtf.candidate.thriftscala.Candidate -import com.twitter.usersignalservice.base.StratoSignalFetcher -import com.twitter.usersignalservice.thriftscala.Signal -import com.twitter.usersignalservice.thriftscala.SignalType -import com.twitter.util.Future -import com.twitter.util.Timer -import javax.inject.Inject -import javax.inject.Singleton - -@Singleton -case class RealGraphOonFetcher @Inject() ( - stratoClient: Client, - timer: Timer, - stats: StatsReceiver) - extends StratoSignalFetcher[UserId, Unit, CandidateSeq] { - import RealGraphOonFetcher._ - override type RawSignalType = Candidate - override val name: String = this.getClass.getCanonicalName - override val statsReceiver: StatsReceiver = stats.scope(name) - - override val stratoColumnPath: String = RealGraphOonFetcher.stratoColumnPath - override val stratoView: Unit = None - - override protected val keyConv: Conv[UserId] = Conv.ofType - override protected val viewConv: Conv[Unit] = Conv.ofType - override protected val valueConv: Conv[CandidateSeq] = - ScroogeConv.fromStruct[CandidateSeq] - - override protected def toStratoKey(userId: UserId): UserId = userId - - override protected def toRawSignals( - realGraphOonCandidates: CandidateSeq - ): Seq[RawSignalType] = realGraphOonCandidates.candidates - - override def process( - query: Query, - rawSignals: Future[Option[Seq[RawSignalType]]] - ): Future[Option[Seq[Signal]]] = { - rawSignals - .map { - _.map( - _.sortBy(-_.score) - .collect { - case c if c.score >= MinRgScore => - Signal( - SignalType.RealGraphOon, - RealGraphOonFetcher.DefaultTimestamp, - Some(InternalId.UserId(c.userId))) - }.take(query.maxResults.getOrElse(Int.MaxValue))) - } - } -} - -object RealGraphOonFetcher { - val stratoColumnPath = "recommendations/real_graph/realGraphScoresOon.User" - // quality threshold for real graph score - private val MinRgScore = 0.0 - // no timestamp for RealGraph Candidates, set default as 0L - private val DefaultTimestamp = 0L -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/ReplyTweetsFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/ReplyTweetsFetcher.scala deleted file mode 100644 index 7f84f41c9..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/ReplyTweetsFetcher.scala +++ /dev/null @@ -1,70 +0,0 @@ -package com.twitter.usersignalservice.signals - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.simclusters_v2.common.UserId -import com.twitter.simclusters_v2.thriftscala.InternalId -import com.twitter.strato.client.Client -import com.twitter.strato.data.Conv -import com.twitter.strato.thrift.ScroogeConv -import com.twitter.twistly.common.TwistlyProfile -import com.twitter.twistly.thriftscala.EngagementMetadata.ReplyTweetMetadata -import com.twitter.twistly.thriftscala.RecentEngagedTweet -import com.twitter.twistly.thriftscala.UserRecentEngagedTweets -import com.twitter.usersignalservice.base.Query -import com.twitter.usersignalservice.base.StratoSignalFetcher -import com.twitter.usersignalservice.thriftscala.Signal -import com.twitter.util.Future -import com.twitter.util.Timer -import javax.inject.Inject -import javax.inject.Singleton - -@Singleton -case class ReplyTweetsFetcher @Inject() ( - stratoClient: Client, - timer: Timer, - stats: StatsReceiver) - extends StratoSignalFetcher[(UserId, Long), Unit, UserRecentEngagedTweets] { - import ReplyTweetsFetcher._ - override type RawSignalType = RecentEngagedTweet - override val name: String = this.getClass.getCanonicalName - override val statsReceiver: StatsReceiver = stats.scope(name) - - override val stratoColumnPath: String = - TwistlyProfile.TwistlyProdProfile.userRecentEngagedStorePath - override val stratoView: Unit = None - - override protected val keyConv: Conv[(UserId, Long)] = Conv.ofType - override protected val viewConv: Conv[Unit] = Conv.ofType - override protected val valueConv: Conv[UserRecentEngagedTweets] = - ScroogeConv.fromStruct[UserRecentEngagedTweets] - - override protected def toStratoKey(userId: UserId): (UserId, Long) = (userId, DefaultVersion) - - override protected def toRawSignals( - userRecentEngagedTweets: UserRecentEngagedTweets - ): Seq[RawSignalType] = - userRecentEngagedTweets.recentEngagedTweets - - override def process( - query: Query, - rawSignals: Future[Option[Seq[RawSignalType]]] - ): Future[Option[Seq[Signal]]] = { - rawSignals.map { - _.map { signals => - val lookBackWindowFilteredSignals = - SignalFilter.lookBackWindow90DayFilter(signals, query.signalType) - lookBackWindowFilteredSignals - .collect { - case RecentEngagedTweet(tweetId, engagedAt, _: ReplyTweetMetadata, _) => - Signal(query.signalType, engagedAt, Some(InternalId.TweetId(tweetId))) - }.take(query.maxResults.getOrElse(Int.MaxValue)) - } - } - } - -} - -object ReplyTweetsFetcher { - // see com.twitter.twistly.store.UserRecentEngagedTweetsStore - private val DefaultVersion = 0 -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/RetweetsFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/RetweetsFetcher.scala deleted file mode 100644 index 4b81c8d0b..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/RetweetsFetcher.scala +++ /dev/null @@ -1,74 +0,0 @@ -package com.twitter.usersignalservice.signals - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.simclusters_v2.common.UserId -import com.twitter.simclusters_v2.thriftscala.InternalId -import com.twitter.strato.client.Client -import com.twitter.strato.data.Conv -import com.twitter.strato.thrift.ScroogeConv -import com.twitter.twistly.common.TwistlyProfile -import com.twitter.twistly.thriftscala.EngagementMetadata.RetweetMetadata -import com.twitter.twistly.thriftscala.RecentEngagedTweet -import com.twitter.twistly.thriftscala.UserRecentEngagedTweets -import com.twitter.usersignalservice.base.Query -import com.twitter.usersignalservice.base.StratoSignalFetcher -import com.twitter.usersignalservice.thriftscala.Signal -import com.twitter.util.Future -import com.twitter.util.Timer -import javax.inject.Inject -import javax.inject.Singleton - -@Singleton -case class RetweetsFetcher @Inject() ( - stratoClient: Client, - timer: Timer, - stats: StatsReceiver) - extends StratoSignalFetcher[(UserId, Long), Unit, UserRecentEngagedTweets] { - import RetweetsFetcher._ - override type RawSignalType = RecentEngagedTweet - override val name: String = this.getClass.getCanonicalName - override val statsReceiver: StatsReceiver = stats.scope(name) - - override val stratoColumnPath: String = - TwistlyProfile.TwistlyProdProfile.userRecentEngagedStorePath - override val stratoView: Unit = None - - override protected val keyConv: Conv[(UserId, Long)] = Conv.ofType - override protected val viewConv: Conv[Unit] = Conv.ofType - override protected val valueConv: Conv[UserRecentEngagedTweets] = - ScroogeConv.fromStruct[UserRecentEngagedTweets] - - override protected def toStratoKey(userId: UserId): (UserId, Long) = (userId, DefaultVersion) - - override protected def toRawSignals( - userRecentEngagedTweets: UserRecentEngagedTweets - ): Seq[RawSignalType] = - userRecentEngagedTweets.recentEngagedTweets - - override def process( - query: Query, - rawSignals: Future[Option[Seq[RawSignalType]]] - ): Future[Option[Seq[Signal]]] = { - rawSignals.map { - _.map { signals => - val lookBackWindowFilteredSignals = - SignalFilter.lookBackWindow90DayFilter(signals, query.signalType) - lookBackWindowFilteredSignals - .filter { recentEngagedTweet => - recentEngagedTweet.features.statusCounts - .flatMap(_.favoriteCount).exists(_ >= MinFavCount) - }.collect { - case RecentEngagedTweet(tweetId, engagedAt, _: RetweetMetadata, _) => - Signal(query.signalType, engagedAt, Some(InternalId.TweetId(tweetId))) - }.take(query.maxResults.getOrElse(Int.MaxValue)) - } - } - } - -} - -object RetweetsFetcher { - private val MinFavCount = 10 - // see com.twitter.twistly.store.UserRecentEngagedTweetsStore - private val DefaultVersion = 0 -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/SignalFilter.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/SignalFilter.scala deleted file mode 100644 index 01be88a26..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/SignalFilter.scala +++ /dev/null @@ -1,48 +0,0 @@ -package com.twitter.usersignalservice.signals - -import com.twitter.twistly.thriftscala.EngagementMetadata.FavoriteMetadata -import com.twitter.twistly.thriftscala.RecentEngagedTweet -import com.twitter.usersignalservice.thriftscala.SignalType -import com.twitter.util.Time - -// Shared Logic for filtering signal across different signal types -object SignalFilter { - - final val LookBackWindow90DayFilterEnabledSignalTypes: Set[SignalType] = Set( - SignalType.TweetFavorite90dV2, - SignalType.Retweet90dV2, - SignalType.OriginalTweet90dV2, - SignalType.Reply90dV2) - - /* Raw Signal Filter for TweetFavorite, Retweet, Original Tweet and Reply - * Filter out all raw signal if the most recent {Tweet Favorite + Retweet + Original Tweet + Reply} - * is older than 90 days. - * The filter is shared across 4 signal types as they are stored in the same physical store - * thus sharing the same TTL - * */ - def lookBackWindow90DayFilter( - signals: Seq[RecentEngagedTweet], - querySignalType: SignalType - ): Seq[RecentEngagedTweet] = { - if (LookBackWindow90DayFilterEnabledSignalTypes.contains( - querySignalType) && !isMostRecentSignalWithin90Days(signals.head)) { - Seq.empty - } else signals - } - - private def isMostRecentSignalWithin90Days( - signal: RecentEngagedTweet - ): Boolean = { - val diff = Time.now - Time.fromMilliseconds(signal.engagedAt) - diff.inDays <= 90 - } - - def isPromotedTweet(signal: RecentEngagedTweet): Boolean = { - signal match { - case RecentEngagedTweet(_, _, metadata: FavoriteMetadata, _) => - metadata.favoriteMetadata.isAd.getOrElse(false) - case _ => false - } - } - -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/TweetClickFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/TweetClickFetcher.scala deleted file mode 100644 index 19462a4e2..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/TweetClickFetcher.scala +++ /dev/null @@ -1,94 +0,0 @@ -package com.twitter.usersignalservice.signals - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.simclusters_v2.common.UserId -import com.twitter.simclusters_v2.thriftscala.InternalId -import com.twitter.strato.client.Client -import com.twitter.strato.data.Conv -import com.twitter.strato.thrift.ScroogeConv -import com.twitter.twistly.thriftscala.RecentTweetClickImpressEvents -import com.twitter.twistly.thriftscala.TweetClickImpressEvent -import com.twitter.usersignalservice.base.Query -import com.twitter.usersignalservice.base.StratoSignalFetcher -import com.twitter.usersignalservice.thriftscala.Signal -import com.twitter.usersignalservice.thriftscala.SignalType -import com.twitter.util.Future -import com.twitter.util.Timer -import javax.inject.Inject -import javax.inject.Singleton - -@Singleton -case class TweetClickFetcher @Inject() ( - stratoClient: Client, - timer: Timer, - stats: StatsReceiver) - extends StratoSignalFetcher[(UserId, Long), Unit, RecentTweetClickImpressEvents] { - - import TweetClickFetcher._ - - override type RawSignalType = TweetClickImpressEvent - override val name: String = this.getClass.getCanonicalName - override val statsReceiver: StatsReceiver = stats.scope(name) - - override val stratoColumnPath: String = stratoPath - override val stratoView: Unit = None - - override protected val keyConv: Conv[(UserId, Long)] = Conv.ofType - override protected val viewConv: Conv[Unit] = Conv.ofType - override protected val valueConv: Conv[RecentTweetClickImpressEvents] = - ScroogeConv.fromStruct[RecentTweetClickImpressEvents] - - override protected def toStratoKey(userId: UserId): (UserId, Long) = (userId, defaultVersion) - - override protected def toRawSignals( - stratoValue: RecentTweetClickImpressEvents - ): Seq[TweetClickImpressEvent] = { - stratoValue.events - } - - override def process( - query: Query, - rawSignals: Future[Option[Seq[TweetClickImpressEvent]]] - ): Future[Option[Seq[Signal]]] = - rawSignals.map { events => - events.map { clicks => - clicks - .filter(dwelltimeFilter(_, query.signalType)) - .map(signalFromTweetClick(_, query.signalType)) - .sortBy(-_.timestamp) - .take(query.maxResults.getOrElse(Int.MaxValue)) - } - } -} - -object TweetClickFetcher { - - val stratoPath = "recommendations/twistly/userRecentTweetClickImpress" - private val defaultVersion = 0L - - private val minDwellTimeMap: Map[SignalType, Long] = Map( - SignalType.GoodTweetClick -> 2 * 1000L, - SignalType.GoodTweetClick5s -> 5 * 1000L, - SignalType.GoodTweetClick10s -> 10 * 1000L, - SignalType.GoodTweetClick30s -> 30 * 1000L, - ) - - def signalFromTweetClick( - tweetClickImpressEvent: TweetClickImpressEvent, - signalType: SignalType - ): Signal = { - Signal( - signalType, - tweetClickImpressEvent.engagedAt, - Some(InternalId.TweetId(tweetClickImpressEvent.entityId)) - ) - } - - def dwelltimeFilter( - tweetClickImpressEvent: TweetClickImpressEvent, - signalType: SignalType - ): Boolean = { - val goodClickDwellTime = minDwellTimeMap(signalType) - tweetClickImpressEvent.clickImpressEventMetadata.totalDwellTime >= goodClickDwellTime - } -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/TweetFavoritesFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/TweetFavoritesFetcher.scala deleted file mode 100644 index b427f722f..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/TweetFavoritesFetcher.scala +++ /dev/null @@ -1,86 +0,0 @@ -package com.twitter.usersignalservice.signals - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.simclusters_v2.common.UserId -import com.twitter.simclusters_v2.thriftscala.InternalId -import com.twitter.strato.client.Client -import com.twitter.strato.data.Conv -import com.twitter.strato.thrift.ScroogeConv -import com.twitter.twistly.common.TwistlyProfile -import com.twitter.twistly.thriftscala.EngagementMetadata.FavoriteMetadata -import com.twitter.twistly.thriftscala.RecentEngagedTweet -import com.twitter.twistly.thriftscala.UserRecentEngagedTweets -import com.twitter.usersignalservice.base.Query -import com.twitter.usersignalservice.base.StratoSignalFetcher -import com.twitter.usersignalservice.thriftscala.Signal -import com.twitter.usersignalservice.thriftscala.SignalType -import com.twitter.util.Future -import com.twitter.util.Timer -import javax.inject.Inject -import javax.inject.Singleton - -@Singleton -case class TweetFavoritesFetcher @Inject() ( - stratoClient: Client, - timer: Timer, - stats: StatsReceiver) - extends StratoSignalFetcher[(UserId, Long), Unit, UserRecentEngagedTweets] { - import TweetFavoritesFetcher._ - override type RawSignalType = RecentEngagedTweet - override val name: String = this.getClass.getCanonicalName - override val statsReceiver: StatsReceiver = stats.scope(name) - - override val stratoColumnPath: String = - TwistlyProfile.TwistlyProdProfile.userRecentEngagedStorePath - override val stratoView: Unit = None - - override protected val keyConv: Conv[(UserId, Long)] = Conv.ofType - override protected val viewConv: Conv[Unit] = Conv.ofType - override protected val valueConv: Conv[UserRecentEngagedTweets] = - ScroogeConv.fromStruct[UserRecentEngagedTweets] - - override protected def toStratoKey(userId: UserId): (UserId, Long) = (userId, DefaultVersion) - - override protected def toRawSignals( - userRecentEngagedTweets: UserRecentEngagedTweets - ): Seq[RawSignalType] = - userRecentEngagedTweets.recentEngagedTweets - - override def process( - query: Query, - rawSignals: Future[Option[Seq[RawSignalType]]] - ): Future[Option[Seq[Signal]]] = { - rawSignals.map { - _.map { signals => - val lookBackWindowFilteredSignals = - SignalFilter.lookBackWindow90DayFilter(signals, query.signalType) - lookBackWindowFilteredSignals - .filter { recentEngagedTweet => - recentEngagedTweet.features.statusCounts - .flatMap(_.favoriteCount).exists(_ >= MinFavCount) - }.filter { recentEngagedTweet => - applySignalTweetTypeFilter(query.signalType, recentEngagedTweet) - }.collect { - case RecentEngagedTweet(tweetId, engagedAt, _: FavoriteMetadata, _) => - Signal(query.signalType, engagedAt, Some(InternalId.TweetId(tweetId))) - }.take(query.maxResults.getOrElse(Int.MaxValue)) - } - } - } - private def applySignalTweetTypeFilter( - signal: SignalType, - recentEngagedTweet: RecentEngagedTweet - ): Boolean = { - // Perform specific filters for particular signal types. - signal match { - case SignalType.AdFavorite => SignalFilter.isPromotedTweet(recentEngagedTweet) - case _ => true - } - } -} - -object TweetFavoritesFetcher { - private val MinFavCount = 10 - // see com.twitter.twistly.store.UserRecentEngagedTweetsStore - private val DefaultVersion = 0 -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/TweetSharesFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/TweetSharesFetcher.scala deleted file mode 100644 index 6205e1bc3..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/TweetSharesFetcher.scala +++ /dev/null @@ -1,77 +0,0 @@ -package com.twitter.usersignalservice.signals - -import com.twitter.bijection.Codec -import com.twitter.bijection.scrooge.BinaryScalaCodec -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.onboarding.relevance.tweet_engagement.thriftscala.EngagementIdentifier -import com.twitter.onboarding.relevance.tweet_engagement.thriftscala.TweetEngagement -import com.twitter.onboarding.relevance.tweet_engagement.thriftscala.TweetEngagements -import com.twitter.scalding_internal.multiformat.format.keyval.KeyValInjection.Long2BigEndian -import com.twitter.simclusters_v2.thriftscala.InternalId -import com.twitter.storage.client.manhattan.kv.ManhattanKVClientMtlsParams -import com.twitter.storehaus_internal.manhattan.Apollo -import com.twitter.storehaus_internal.manhattan.ManhattanCluster -import com.twitter.twistly.common.UserId -import com.twitter.usersignalservice.base.ManhattanSignalFetcher -import com.twitter.usersignalservice.base.Query -import com.twitter.usersignalservice.thriftscala.Signal -import com.twitter.usersignalservice.thriftscala.SignalType -import com.twitter.util.Future -import com.twitter.util.Timer -import javax.inject.Inject -import javax.inject.Singleton - -@Singleton -case class TweetSharesFetcher @Inject() ( - manhattanKVClientMtlsParams: ManhattanKVClientMtlsParams, - timer: Timer, - stats: StatsReceiver) - extends ManhattanSignalFetcher[Long, TweetEngagements] { - - import TweetSharesFetcher._ - - override type RawSignalType = TweetEngagement - - override def name: String = this.getClass.getCanonicalName - - override def statsReceiver: StatsReceiver = stats.scope(name) - - override protected def manhattanAppId: String = MHAppId - - override protected def manhattanDatasetName: String = MHDatasetName - - override protected def manhattanClusterId: ManhattanCluster = Apollo - - override protected def manhattanKeyCodec: Codec[Long] = Long2BigEndian - - override protected def manhattanRawSignalCodec: Codec[TweetEngagements] = BinaryScalaCodec( - TweetEngagements) - - override protected def toManhattanKey(userId: UserId): Long = userId - - override protected def toRawSignals( - manhattanValue: TweetEngagements - ): Seq[TweetEngagement] = manhattanValue.tweetEngagements - - override def process( - query: Query, - rawSignals: Future[Option[Seq[TweetEngagement]]] - ): Future[Option[Seq[Signal]]] = { - rawSignals.map { - _.map { - _.collect { - case tweetEngagement if (tweetEngagement.engagementType == EngagementIdentifier.Share) => - Signal( - SignalType.TweetShareV1, - tweetEngagement.timestampMs, - Some(InternalId.TweetId(tweetEngagement.tweetId))) - }.sortBy(-_.timestamp).take(query.maxResults.getOrElse(Int.MaxValue)) - } - } - } -} - -object TweetSharesFetcher { - private val MHAppId = "uss_prod_apollo" - private val MHDatasetName = "tweet_share_engagements" -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/VideoTweetsPlayback50Fetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/VideoTweetsPlayback50Fetcher.scala deleted file mode 100644 index 1577b2e99..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/VideoTweetsPlayback50Fetcher.scala +++ /dev/null @@ -1,72 +0,0 @@ -package com.twitter.usersignalservice.signals - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.twistly.common.UserId -import com.twitter.twistly.thriftscala.UserRecentVideoViewTweets -import com.twitter.twistly.thriftscala.VideoViewEngagementType -import com.twitter.usersignalservice.base.Query -import com.twitter.usersignalservice.thriftscala.Signal -import com.twitter.util.Future -import com.twitter.util.Timer -import com.twitter.twistly.thriftscala.RecentVideoViewTweet -import com.twitter.usersignalservice.thriftscala.SignalType -import com.twitter.simclusters_v2.thriftscala.InternalId -import com.twitter.strato.client.Client -import com.twitter.strato.data.Conv -import com.twitter.strato.thrift.ScroogeConv -import com.twitter.usersignalservice.base.StratoSignalFetcher -import javax.inject.Inject -import javax.inject.Singleton - -@Singleton -case class VideoTweetsPlayback50Fetcher @Inject() ( - stratoClient: Client, - timer: Timer, - stats: StatsReceiver) - extends StratoSignalFetcher[ - (UserId, VideoViewEngagementType), - Unit, - UserRecentVideoViewTweets - ] { - import VideoTweetsPlayback50Fetcher._ - - override type RawSignalType = RecentVideoViewTweet - override def name: String = this.getClass.getCanonicalName - override def statsReceiver: StatsReceiver = stats.scope(name) - - override val stratoColumnPath: String = StratoColumn - override val stratoView: Unit = None - override protected val keyConv: Conv[(UserId, VideoViewEngagementType)] = Conv.ofType - override protected val viewConv: Conv[Unit] = Conv.ofType - override protected val valueConv: Conv[UserRecentVideoViewTweets] = - ScroogeConv.fromStruct[UserRecentVideoViewTweets] - - override protected def toStratoKey(userId: UserId): (UserId, VideoViewEngagementType) = - (userId, VideoViewEngagementType.VideoPlayback50) - - override protected def toRawSignals( - stratoValue: UserRecentVideoViewTweets - ): Seq[RecentVideoViewTweet] = stratoValue.recentEngagedTweets - - override def process( - query: Query, - rawSignals: Future[Option[Seq[RecentVideoViewTweet]]] - ): Future[Option[Seq[Signal]]] = rawSignals.map { - _.map { - _.filter(videoView => - !videoView.isPromotedTweet && videoView.videoDurationSeconds >= MinVideoDurationSeconds) - .map { rawSignal => - Signal( - SignalType.VideoView90dPlayback50V1, - rawSignal.engagedAt, - Some(InternalId.TweetId(rawSignal.tweetId))) - }.take(query.maxResults.getOrElse(Int.MaxValue)) - } - } - -} - -object VideoTweetsPlayback50Fetcher { - private val StratoColumn = "recommendations/twistly/userRecentVideoViewTweetEngagements" - private val MinVideoDurationSeconds = 10 -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/VideoTweetsQualityViewFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/VideoTweetsQualityViewFetcher.scala deleted file mode 100644 index d513b978c..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/VideoTweetsQualityViewFetcher.scala +++ /dev/null @@ -1,72 +0,0 @@ -package com.twitter.usersignalservice.signals - -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.twistly.common.UserId -import com.twitter.twistly.thriftscala.UserRecentVideoViewTweets -import com.twitter.twistly.thriftscala.VideoViewEngagementType -import com.twitter.usersignalservice.base.Query -import com.twitter.usersignalservice.thriftscala.Signal -import com.twitter.util.Future -import com.twitter.util.Timer -import com.twitter.twistly.thriftscala.RecentVideoViewTweet -import com.twitter.usersignalservice.thriftscala.SignalType -import com.twitter.simclusters_v2.thriftscala.InternalId -import com.twitter.strato.client.Client -import com.twitter.strato.data.Conv -import com.twitter.strato.thrift.ScroogeConv -import com.twitter.usersignalservice.base.StratoSignalFetcher -import javax.inject.Inject -import javax.inject.Singleton - -@Singleton -case class VideoTweetsQualityViewFetcher @Inject() ( - stratoClient: Client, - timer: Timer, - stats: StatsReceiver) - extends StratoSignalFetcher[ - (UserId, VideoViewEngagementType), - Unit, - UserRecentVideoViewTweets - ] { - import VideoTweetsQualityViewFetcher._ - override type RawSignalType = RecentVideoViewTweet - override def name: String = this.getClass.getCanonicalName - override def statsReceiver: StatsReceiver = stats.scope(name) - - override val stratoColumnPath: String = StratoColumn - override val stratoView: Unit = None - override protected val keyConv: Conv[(UserId, VideoViewEngagementType)] = Conv.ofType - override protected val viewConv: Conv[Unit] = Conv.ofType - override protected val valueConv: Conv[UserRecentVideoViewTweets] = - ScroogeConv.fromStruct[UserRecentVideoViewTweets] - - override protected def toStratoKey(userId: UserId): (UserId, VideoViewEngagementType) = - (userId, VideoViewEngagementType.VideoQualityView) - - override protected def toRawSignals( - stratoValue: UserRecentVideoViewTweets - ): Seq[RecentVideoViewTweet] = stratoValue.recentEngagedTweets - - override def process( - query: Query, - rawSignals: Future[Option[Seq[RecentVideoViewTweet]]] - ): Future[Option[Seq[Signal]]] = { - rawSignals.map { - _.map { - _.filter(videoView => - !videoView.isPromotedTweet && videoView.videoDurationSeconds >= MinVideoDurationSeconds) - .map { rawSignal => - Signal( - SignalType.VideoView90dQualityV1, - rawSignal.engagedAt, - Some(InternalId.TweetId(rawSignal.tweetId))) - }.take(query.maxResults.getOrElse(Int.MaxValue)) - } - } - } -} - -object VideoTweetsQualityViewFetcher { - private val StratoColumn = "recommendations/twistly/userRecentVideoViewTweetEngagements" - private val MinVideoDurationSeconds = 10 -} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/common/BUILD b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/common/BUILD deleted file mode 100644 index baca538b0..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/common/BUILD +++ /dev/null @@ -1,15 +0,0 @@ -scala_library( - compiler_option_sets = ["fatal_warnings"], - tags = ["bazel-compatible"], - dependencies = [ - "hermit/hermit-core/src/main/scala/com/twitter/hermit/predicate/socialgraph", - "src/scala/com/twitter/simclusters_v2/common", - "src/scala/com/twitter/twistly/common", - "src/thrift/com/twitter/simclusters_v2:simclusters_v2-thrift-scala", - "src/thrift/com/twitter/socialgraph:thrift-scala", - "user-signal-service/thrift/src/main/thrift:thrift-scala", - "util/util-core:util-core-util", - "util/util-core/src/main/java/com/twitter/util", - "util/util-stats/src/main/scala/com/twitter/finagle/stats", - ], -) diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/common/SGSUtils.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/common/SGSUtils.scala deleted file mode 100644 index 01fbd8f38..000000000 --- a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/common/SGSUtils.scala +++ /dev/null @@ -1,59 +0,0 @@ -package com.twitter.usersignalservice.signals -package common - -import com.twitter.simclusters_v2.thriftscala.InternalId -import com.twitter.socialgraph.thriftscala.EdgesRequest -import com.twitter.socialgraph.thriftscala.EdgesResult -import com.twitter.socialgraph.thriftscala.PageRequest -import com.twitter.socialgraph.thriftscala.RelationshipType -import com.twitter.socialgraph.thriftscala.SocialGraphService -import com.twitter.socialgraph.thriftscala.SrcRelationship -import com.twitter.twistly.common.UserId -import com.twitter.usersignalservice.thriftscala.Signal -import com.twitter.usersignalservice.thriftscala.SignalType -import com.twitter.util.Duration -import com.twitter.util.Future -import com.twitter.util.Time - -object SGSUtils { - val MaxNumSocialGraphSignals = 200 - val MaxAge: Duration = Duration.fromDays(90) - - def getSGSRawSignals( - userId: UserId, - sgsClient: SocialGraphService.MethodPerEndpoint, - relationshipType: RelationshipType, - signalType: SignalType, - ): Future[Option[Seq[Signal]]] = { - val edgeRequest = EdgesRequest( - relationship = SrcRelationship(userId, relationshipType), - pageRequest = Some(PageRequest(count = None)) - ) - val now = Time.now.inMilliseconds - - sgsClient - .edges(Seq(edgeRequest)) - .map { sgsEdges => - sgsEdges.flatMap { - case EdgesResult(edges, _, _) => - edges.collect { - case edge if edge.createdAt >= now - MaxAge.inMilliseconds => - Signal( - signalType, - timestamp = edge.createdAt, - targetInternalId = Some(InternalId.UserId(edge.target))) - } - } - } - .map { signals => - signals - .take(MaxNumSocialGraphSignals) - .groupBy(_.targetInternalId) - .mapValues(_.maxBy(_.timestamp)) - .values - .toSeq - .sortBy(-_.timestamp) - } - .map(Some(_)) - } -} diff --git a/user-signal-service/thrift/src/main/thrift/BUILD b/user-signal-service/thrift/src/main/thrift/BUILD deleted file mode 100644 index faab4af7e..000000000 --- a/user-signal-service/thrift/src/main/thrift/BUILD +++ /dev/null @@ -1,20 +0,0 @@ -create_thrift_libraries( - base_name = "thrift", - sources = [ - "client_identifier.thrift", - "service.thrift", - "signal.thrift", - ], - platform = "java8", - tags = ["bazel-compatible"], - dependency_roots = [ - "src/thrift/com/twitter/simclusters_v2:simclusters_v2-thrift", - ], - generate_languages = [ - "java", - "scala", - "strato", - ], - provides_java_name = "uss-thrift-java", - provides_scala_name = "uss-thrift-scala", -) diff --git a/user-signal-service/thrift/src/main/thrift/client_identifier.thrift b/user-signal-service/thrift/src/main/thrift/client_identifier.thrift deleted file mode 100644 index c953e6b8f..000000000 --- a/user-signal-service/thrift/src/main/thrift/client_identifier.thrift +++ /dev/null @@ -1,22 +0,0 @@ -namespace java com.twitter.usersignalservice.thriftjava -namespace py gen.twitter.usersignalservice.service -#@namespace scala com.twitter.usersignalservice.thriftscala -#@namespace strato com.twitter.usersignalservice.strato - -# ClientIdentifier should be defined as ServiceId_Product -enum ClientIdentifier { - # reserve 1-10 for CrMixer - CrMixer_Home = 1 - CrMixer_Notifications = 2 - CrMixer_Email = 3 - # reserve 11-20 for RSX - RepresentationScorer_Home = 11 - RepresentationScorer_Notifications = 12 - - # reserve 21-30 for Explore - ExploreRanker = 21 - - # We will throw an exception after we make sure all clients are sending the - # ClientIdentifier in their request. - Unknown = 9999 -} diff --git a/user-signal-service/thrift/src/main/thrift/service.thrift b/user-signal-service/thrift/src/main/thrift/service.thrift deleted file mode 100644 index a10959ea8..000000000 --- a/user-signal-service/thrift/src/main/thrift/service.thrift +++ /dev/null @@ -1,23 +0,0 @@ -namespace java com.twitter.usersignalservice.thriftjava -namespace py gen.twitter.usersignalservice.service -#@namespace scala com.twitter.usersignalservice.thriftscala -#@namespace strato com.twitter.usersignalservice.strato - -include "signal.thrift" -include "client_identifier.thrift" - -struct SignalRequest { - 1: optional i64 maxResults - 2: required signal.SignalType signalType -} - -struct BatchSignalRequest { - 1: required i64 userId(personalDataType = "UserId") - 2: required list signalRequest - # make sure to populate the clientId, otherwise the service would throw exceptions - 3: optional client_identifier.ClientIdentifier clientId -}(hasPersonalData='true') - -struct BatchSignalResponse { - 1: required map> signalResponse -} diff --git a/user-signal-service/thrift/src/main/thrift/signal.thrift b/user-signal-service/thrift/src/main/thrift/signal.thrift deleted file mode 100644 index e32947be8..000000000 --- a/user-signal-service/thrift/src/main/thrift/signal.thrift +++ /dev/null @@ -1,113 +0,0 @@ -namespace java com.twitter.usersignalservice.thriftjava -namespace py gen.twitter.usersignalservice.signal -#@namespace scala com.twitter.usersignalservice.thriftscala -#@namespace strato com.twitter.usersignalservice.strato - -include "com/twitter/simclusters_v2/identifier.thrift" - - -enum SignalType { - /** - Please maintain the key space rule to avoid compatibility issue for the downstream production job - * Prod Key space: 0-1000 - * Devel Key space: 1000+ - **/ - - - /* tweet based signals */ - TweetFavorite = 0, // 540 Days Looback window - Retweet = 1, // 540 Days Lookback window - TrafficAttribution = 2, - OriginalTweet = 3, // 540 Days Looback window - Reply = 4, // 540 Days Looback window - /* Tweets that the user shared (sharer side) - * V1: successful shares (click share icon -> click in-app, or off-platform share option - * or copying link) - * */ - TweetShare_V1 = 5, // 14 Days Lookback window - - TweetFavorite_90D_V2 = 6, // 90 Days Lookback window : tweet fav from user with recent engagement in the past 90 days - Retweet_90D_V2 = 7, // 90 Days Lookback window : retweet from user with recent engagement in the past 90 days - OriginalTweet_90D_V2 = 8, // 90 Days Lookback window : original tweet from user with recent engagement in the past 90 days - Reply_90D_V2 = 9,// 90 Days Lookback window : reply from user with recent engagement in the past 90 days - GoodTweetClick = 10,// GoodTweetCilick Signal : Dwell Time Threshold >=2s - - // video tweets that were watched (10s OR 95%) in the past 90 days, are not ads, and have >=10s video - VideoView_90D_Quality_V1 = 11 // 90 Days Lookback window - // video tweets that were watched 50% in the past 90 days, are not ads, and have >=10s video - VideoView_90D_Playback50_V1 = 12 // 90 Days Lookback window - - /* user based signals */ - AccountFollow = 100, // infinite lookback window - RepeatedProfileVisit_14D_MinVisit2_V1 = 101, - RepeatedProfileVisit_90D_MinVisit6_V1 = 102, - RepeatedProfileVisit_180D_MinVisit6_V1 = 109, - RepeatedProfileVisit_14D_MinVisit2_V1_No_Negative = 110, - RepeatedProfileVisit_90D_MinVisit6_V1_No_Negative = 111, - RepeatedProfileVisit_180D_MinVisit6_V1_No_Negative = 112, - RealGraphOon = 104, - TrafficAttributionProfile_30D_LastVisit = 105, - TrafficAttributionProfile_30D_DecayedVisit = 106, - TrafficAttributionProfile_30D_WeightedEventDecayedVisit = 107, - TrafficAttributionProfile_30D_DecayedVisit_WithoutAgathaFilter = 108, - GoodProfileClick = 120, // GoodTweetCilick Signal : Dwell Time Threshold >=10s - AdFavorite = 121, // Favorites filtered to ads TweetFavorite has both organic and ads Favs - - // AccountFollowWithDelay should only be used by high-traffic clients and has 1 min delay - AccountFollowWithDelay = 122, - - - /* notifications based signals */ - /* V1: notification clicks from past 90 days with negative events (reports, dislikes) being filtered */ - NotificationOpenAndClick_V1 = 200, - - /* - negative signals for filter - */ - NegativeEngagedTweetId = 901 // tweetId for all negative engagements - NegativeEngagedUserId = 902 // userId for all negative engagements - AccountBlock = 903, - AccountMute = 904, - // skip 905 - 906 for Account report abuse / report spam - // User clicked dont like from past 90 Days - TweetDontLike = 907 - // User clicked see fewer on the recommended tweet from past 90 Days - TweetSeeFewer = 908 - // User clicked on the "report tweet" option in the tweet caret dropdown menu from past 90 days - TweetReport = 909 - - /* - devel signals - use the num > 1000 to test out signals under development/ddg - put it back to the correct corresponding Key space (0-1000) before ship - */ - GoodTweetClick_5s = 1001,// GoodTweetCilick Signal : Dwell Time Threshold >=5s - GoodTweetClick_10s = 1002,// GoodTweetCilick Signal : Dwell Time Threshold >=10s - GoodTweetClick_30s = 1003,// GoodTweetCilick Signal : Dwell Time Threshold >=30s - - GoodProfileClick_20s = 1004,// GoodProfileClick Signal : Dwell Time Threshold >=20s - GoodProfileClick_30s = 1005,// GoodProfileClick Signal : Dwell Time Threshold >=30s - - GoodProfileClick_Filtered = 1006, // GoodProfileClick Signal filtered by blocks and mutes. - GoodProfileClick_20s_Filtered = 1007// GoodProfileClick Signal : Dwell Time Threshold >=20s, filtered byblocks and mutes. - GoodProfileClick_30s_Filtered = 1008,// GoodProfileClick Signal : Dwell Time Threshold >=30s, filtered by blocks and mutes. - - /* - Unified Signals - These signals are aimed to unify multiple signal fetches into a single response. - This might be a healthier way for our retrievals layer to run inference on. - */ - TweetBasedUnifiedUniformSignal = 1300 - TweetBasedUnifiedEngagementWeightedSignal = 1301 - TweetBasedUnifiedQualityWeightedSignal = 1302 - ProducerBasedUnifiedUniformSignal = 1303 - ProducerBasedUnifiedEngagementWeightedSignal = 1304 - ProducerBasedUnifiedQualityWeightedSignal = 1305 - -} - -struct Signal { - 1: required SignalType signalType - 2: required i64 timestamp - 3: optional identifier.InternalId targetInternalId -}