mirror of
https://github.com/twitter/the-algorithm.git
synced 2025-01-20 07:51:15 +01:00
Merge 832a5de75880bf6ccdc5b34e15f5aad86708521a into 72eda9a24f815f6d566818cbf8518138e29d83e9
This commit is contained in:
commit
9bc99f431f
@ -85,16 +85,13 @@ trait UnifiedGraphWriter[
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private def initRecosHoseKafka(
|
private def initRecosHoseKafka(queue: java.util.Queue[Array[RecosHoseMessage]], queuelimit: Semaphore): Unit = {
|
||||||
queue: java.util.Queue[Array[RecosHoseMessage]],
|
|
||||||
queuelimit: Semaphore,
|
|
||||||
): Unit = {
|
|
||||||
try {
|
try {
|
||||||
consumers = (0 until consumerNum).map { index =>
|
val consumers = (0 until consumerNum).map { index =>
|
||||||
new ThreadSafeKafkaConsumerClient(
|
new ThreadSafeKafkaConsumerClient(kafkaConsumerBuilder.clientId(s"clientId-$index").enableAutoCommit(false).config)
|
||||||
kafkaConsumerBuilder.clientId(s"clientId-$index").enableAutoCommit(false).config)
|
|
||||||
}
|
}
|
||||||
processors = consumers.zipWithIndex.map {
|
|
||||||
|
val processors = consumers.zipWithIndex.map {
|
||||||
case (consumer, index) =>
|
case (consumer, index) =>
|
||||||
val bufferedWriter = BufferedEdgeCollector(bufferSize, queue, queuelimit, statsReceiver)
|
val bufferedWriter = BufferedEdgeCollector(bufferSize, queue, queuelimit, statsReceiver)
|
||||||
val processor = RecosEdgeProcessor(bufferedWriter)(statsReceiver)
|
val processor = RecosEdgeProcessor(bufferedWriter)(statsReceiver)
|
||||||
@ -112,23 +109,16 @@ trait UnifiedGraphWriter[
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.info(s"starting ${processors.size} recosKafka processors")
|
log.info(s"starting ${processors.size} recosKafka processors")
|
||||||
processors.foreach { processor =>
|
processors.foreach(_.start())
|
||||||
processor.start()
|
|
||||||
}
|
|
||||||
} catch {
|
} catch {
|
||||||
case e: Throwable =>
|
case e: Throwable =>
|
||||||
e.printStackTrace()
|
e.printStackTrace()
|
||||||
log.error(e, e.toString)
|
log.error(e, e.toString)
|
||||||
processors.foreach { processor =>
|
processors.foreach(_.close())
|
||||||
processor.close()
|
consumers.foreach(_.close())
|
||||||
}
|
|
||||||
processors = Seq.empty
|
|
||||||
consumers.foreach { consumer =>
|
|
||||||
consumer.close()
|
|
||||||
}
|
|
||||||
consumers = Seq.empty
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initialize the graph writers,
|
* Initialize the graph writers,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user