Merge 832a5de75880bf6ccdc5b34e15f5aad86708521a into fb54d8b54984f89f7dba90a18e7c3048421464c3

This commit is contained in:
Rajiv Singh 2023-05-22 17:40:27 -05:00 committed by GitHub
commit 5c34138dab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -85,50 +85,40 @@ trait UnifiedGraphWriter[
} }
} }
private def initRecosHoseKafka( private def initRecosHoseKafka(queue: java.util.Queue[Array[RecosHoseMessage]], queuelimit: Semaphore): Unit = {
queue: java.util.Queue[Array[RecosHoseMessage]], try {
queuelimit: Semaphore, val consumers = (0 until consumerNum).map { index =>
): Unit = { new ThreadSafeKafkaConsumerClient(kafkaConsumerBuilder.clientId(s"clientId-$index").enableAutoCommit(false).config)
try {
consumers = (0 until consumerNum).map { index =>
new ThreadSafeKafkaConsumerClient(
kafkaConsumerBuilder.clientId(s"clientId-$index").enableAutoCommit(false).config)
}
processors = consumers.zipWithIndex.map {
case (consumer, index) =>
val bufferedWriter = BufferedEdgeCollector(bufferSize, queue, queuelimit, statsReceiver)
val processor = RecosEdgeProcessor(bufferedWriter)(statsReceiver)
AtLeastOnceProcessor[String, RecosHoseMessage](
s"recos-injector-kafka-$index",
hosename,
consumer,
processor.process,
maxPendingRequests = MaxPendingRequests * bufferSize,
workerThreads = ProcessorThreads,
commitIntervalMs = CommitIntervalMs,
statsReceiver = statsReceiver
)
}
log.info(s"starting ${processors.size} recosKafka processors")
processors.foreach { processor =>
processor.start()
}
} catch {
case e: Throwable =>
e.printStackTrace()
log.error(e, e.toString)
processors.foreach { processor =>
processor.close()
}
processors = Seq.empty
consumers.foreach { consumer =>
consumer.close()
}
consumers = Seq.empty
} }
val processors = consumers.zipWithIndex.map {
case (consumer, index) =>
val bufferedWriter = BufferedEdgeCollector(bufferSize, queue, queuelimit, statsReceiver)
val processor = RecosEdgeProcessor(bufferedWriter)(statsReceiver)
AtLeastOnceProcessor[String, RecosHoseMessage](
s"recos-injector-kafka-$index",
hosename,
consumer,
processor.process,
maxPendingRequests = MaxPendingRequests * bufferSize,
workerThreads = ProcessorThreads,
commitIntervalMs = CommitIntervalMs,
statsReceiver = statsReceiver
)
}
log.info(s"starting ${processors.size} recosKafka processors")
processors.foreach(_.start())
} catch {
case e: Throwable =>
e.printStackTrace()
log.error(e, e.toString)
processors.foreach(_.close())
consumers.foreach(_.close())
} }
}
/** /**
* Initialize the graph writers, * Initialize the graph writers,