optimize code for initializing Kafka consumers and processors

Signed-off-by: Rajiv Ranjan Singh <rajivperfect007@gmail.com>
This commit is contained in:
Rajiv Ranjan Singh 2023-04-01 09:30:37 +05:30
parent ec83d01dca
commit 832a5de758

View File

@ -85,16 +85,13 @@ trait UnifiedGraphWriter[
} }
} }
private def initRecosHoseKafka( private def initRecosHoseKafka(queue: java.util.Queue[Array[RecosHoseMessage]], queuelimit: Semaphore): Unit = {
queue: java.util.Queue[Array[RecosHoseMessage]],
queuelimit: Semaphore,
): Unit = {
try { try {
consumers = (0 until consumerNum).map { index => val consumers = (0 until consumerNum).map { index =>
new ThreadSafeKafkaConsumerClient( new ThreadSafeKafkaConsumerClient(kafkaConsumerBuilder.clientId(s"clientId-$index").enableAutoCommit(false).config)
kafkaConsumerBuilder.clientId(s"clientId-$index").enableAutoCommit(false).config)
} }
processors = consumers.zipWithIndex.map {
val processors = consumers.zipWithIndex.map {
case (consumer, index) => case (consumer, index) =>
val bufferedWriter = BufferedEdgeCollector(bufferSize, queue, queuelimit, statsReceiver) val bufferedWriter = BufferedEdgeCollector(bufferSize, queue, queuelimit, statsReceiver)
val processor = RecosEdgeProcessor(bufferedWriter)(statsReceiver) val processor = RecosEdgeProcessor(bufferedWriter)(statsReceiver)
@ -112,23 +109,16 @@ trait UnifiedGraphWriter[
} }
log.info(s"starting ${processors.size} recosKafka processors") log.info(s"starting ${processors.size} recosKafka processors")
processors.foreach { processor => processors.foreach(_.start())
processor.start()
}
} catch { } catch {
case e: Throwable => case e: Throwable =>
e.printStackTrace() e.printStackTrace()
log.error(e, e.toString) log.error(e, e.toString)
processors.foreach { processor => processors.foreach(_.close())
processor.close() consumers.foreach(_.close())
}
processors = Seq.empty
consumers.foreach { consumer =>
consumer.close()
}
consumers = Seq.empty
}
} }
}
/** /**
* Initialize the graph writers, * Initialize the graph writers,