the-algorithm/ann/src/main/scala/com/twitter/ann/util/IndexBuilderUtils.scala
twitter-team ef4c5eb65e Twitter Recommendation Algorithm
Please note we have force-pushed a new initial commit in order to remove some publicly-available Twitter user information. Note that this process may be required in the future.
2023-03-31 17:36:31 -05:00

32 lines
957 B
Scala

package com.twitter.ann.util
import com.twitter.ann.common.{Appendable, EntityEmbedding}
import com.twitter.concurrent.AsyncStream
import com.twitter.logging.Logger
import com.twitter.util.Future
import java.util.concurrent.atomic.AtomicInteger
object IndexBuilderUtils {
val Log = Logger.apply()
def addToIndex[T](
appendable: Appendable[T, _, _],
embeddings: Seq[EntityEmbedding[T]],
concurrencyLevel: Int
): Future[Int] = {
val count = new AtomicInteger()
// Async stream allows us to procss at most concurrentLevel futures at a time.
Future.Unit.before {
val stream = AsyncStream.fromSeq(embeddings)
val appendStream = stream.mapConcurrent(concurrencyLevel) { annEmbedding =>
val processed = count.incrementAndGet()
if (processed % 10000 == 0) {
Log.info(s"Performed $processed updates")
}
appendable.append(annEmbedding)
}
appendStream.size
}
}
}