the-algorithm/src/scala/com/twitter/recos/user_tweet_entity_graph/UserTweetEntityGraphWriter....

106 lines
4.1 KiB
Scala

package com.twitter.recos.user_tweet_entity_graph
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finatra.kafka.consumers.FinagleKafkaConsumerBuilder
import com.twitter.graphjet.algorithms.{RecommendationType, TweetIDMask}
import com.twitter.graphjet.bipartite.NodeMetadataLeftIndexedMultiSegmentBipartiteGraph
import com.twitter.graphjet.bipartite.segment.NodeMetadataLeftIndexedBipartiteGraphSegment
import com.twitter.recos.hose.common.UnifiedGraphWriter
import com.twitter.recos.internal.thriftscala.RecosHoseMessage
import com.twitter.recos.serviceapi.Tweetypie._
/**
* The class submits a number of $numBootstrapWriters graph writer threads, BufferedEdgeWriter,
* during service startup. One of them is live writer thread, and the other $(numBootstrapWriters - 1)
* are catchup writer threads. All of them consume kafka events from an internal concurrent queue,
* which is populated by kafka reader threads. At bootstrap time, the kafka reader threads look
* back kafka offset from several hours ago and populate the internal concurrent queue.
* Each graph writer thread writes to an individual graph segment separately.
* The $(numBootstrapWriters - 1) catchup writer threads will stop once all events
* between current system time at startup and the time in memcache are processed.
* The live writer thread will continue to write all incoming kafka events.
* It lives through the entire life cycle of recos graph service.
*/
case class UserTweetEntityGraphWriter(
shardId: String,
env: String,
hosename: String,
bufferSize: Int,
kafkaConsumerBuilder: FinagleKafkaConsumerBuilder[String, RecosHoseMessage],
clientId: String,
statsReceiver: StatsReceiver)
extends UnifiedGraphWriter[
NodeMetadataLeftIndexedBipartiteGraphSegment,
NodeMetadataLeftIndexedMultiSegmentBipartiteGraph
] {
writer =>
// The max throughput for each kafka consumer is around 25MB/s
// Use 4 processors for 100MB/s catch-up speed.
val consumerNum: Int = 4
// Leave 1 Segments to LiveWriter
val catchupWriterNum: Int = RecosConfig.maxNumSegments - 1
private final val EMTPY_LEFT_NODE_METADATA = new Array[Array[Int]](1)
/**
* Adds a RecosHoseMessage to the graph. used by live writer to insert edges to the
* current segment
*/
override def addEdgeToGraph(
graph: NodeMetadataLeftIndexedMultiSegmentBipartiteGraph,
recosHoseMessage: RecosHoseMessage
): Unit = {
graph.addEdge(
recosHoseMessage.leftId,
getMetaEdge(recosHoseMessage.rightId, recosHoseMessage.card),
UserTweetEdgeTypeMask.actionTypeToEdgeType(recosHoseMessage.action),
recosHoseMessage.edgeMetadata.getOrElse(0L),
EMTPY_LEFT_NODE_METADATA,
extractEntities(recosHoseMessage)
)
}
/**
* Adds a RecosHoseMessage to the given segment in the graph. Used by catch up writers to
* insert edges to non-current (old) segments
*/
override def addEdgeToSegment(
segment: NodeMetadataLeftIndexedBipartiteGraphSegment,
recosHoseMessage: RecosHoseMessage
): Unit = {
segment.addEdge(
recosHoseMessage.leftId,
getMetaEdge(recosHoseMessage.rightId, recosHoseMessage.card),
UserTweetEdgeTypeMask.actionTypeToEdgeType(recosHoseMessage.action),
recosHoseMessage.edgeMetadata.getOrElse(0L),
EMTPY_LEFT_NODE_METADATA,
extractEntities(recosHoseMessage)
)
}
private def getMetaEdge(rightId: Long, cardOption: Option[Byte]): Long = {
cardOption
.map { card =>
if (isPhotoCard(card)) TweetIDMask.photo(rightId)
else if (isPlayerCard(card)) TweetIDMask.player(rightId)
else if (isSummaryCard(card)) TweetIDMask.summary(rightId)
else if (isPromotionCard(card)) TweetIDMask.promotion(rightId)
else rightId
}
.getOrElse(rightId)
}
private def extractEntities(message: RecosHoseMessage): Array[Array[Int]] = {
val entities: Array[Array[Int]] =
new Array[Array[Int]](RecommendationType.METADATASIZE.getValue)
message.entities.foreach {
_.foreach {
case (entityType, ids) =>
entities.update(entityType, ids.toArray)
}
}
entities
}
}