diff --git a/src/scala/com/twitter/recos/hose/common/UnifiedGraphWriter.scala b/src/scala/com/twitter/recos/hose/common/UnifiedGraphWriter.scala index bac62e418..74cc84844 100644 --- a/src/scala/com/twitter/recos/hose/common/UnifiedGraphWriter.scala +++ b/src/scala/com/twitter/recos/hose/common/UnifiedGraphWriter.scala @@ -85,50 +85,40 @@ trait UnifiedGraphWriter[ } } - private def initRecosHoseKafka( - queue: java.util.Queue[Array[RecosHoseMessage]], - queuelimit: Semaphore, - ): Unit = { - try { - consumers = (0 until consumerNum).map { index => - new ThreadSafeKafkaConsumerClient( - kafkaConsumerBuilder.clientId(s"clientId-$index").enableAutoCommit(false).config) - } - processors = consumers.zipWithIndex.map { - case (consumer, index) => - val bufferedWriter = BufferedEdgeCollector(bufferSize, queue, queuelimit, statsReceiver) - val processor = RecosEdgeProcessor(bufferedWriter)(statsReceiver) - - AtLeastOnceProcessor[String, RecosHoseMessage]( - s"recos-injector-kafka-$index", - hosename, - consumer, - processor.process, - maxPendingRequests = MaxPendingRequests * bufferSize, - workerThreads = ProcessorThreads, - commitIntervalMs = CommitIntervalMs, - statsReceiver = statsReceiver - ) - } - - log.info(s"starting ${processors.size} recosKafka processors") - processors.foreach { processor => - processor.start() - } - } catch { - case e: Throwable => - e.printStackTrace() - log.error(e, e.toString) - processors.foreach { processor => - processor.close() - } - processors = Seq.empty - consumers.foreach { consumer => - consumer.close() - } - consumers = Seq.empty +private def initRecosHoseKafka(queue: java.util.Queue[Array[RecosHoseMessage]], queuelimit: Semaphore): Unit = { + try { + val consumers = (0 until consumerNum).map { index => + new ThreadSafeKafkaConsumerClient(kafkaConsumerBuilder.clientId(s"clientId-$index").enableAutoCommit(false).config) } + + val processors = consumers.zipWithIndex.map { + case (consumer, index) => + val bufferedWriter = BufferedEdgeCollector(bufferSize, queue, queuelimit, statsReceiver) + val processor = RecosEdgeProcessor(bufferedWriter)(statsReceiver) + + AtLeastOnceProcessor[String, RecosHoseMessage]( + s"recos-injector-kafka-$index", + hosename, + consumer, + processor.process, + maxPendingRequests = MaxPendingRequests * bufferSize, + workerThreads = ProcessorThreads, + commitIntervalMs = CommitIntervalMs, + statsReceiver = statsReceiver + ) + } + + log.info(s"starting ${processors.size} recosKafka processors") + processors.foreach(_.start()) + + } catch { + case e: Throwable => + e.printStackTrace() + log.error(e, e.toString) + processors.foreach(_.close()) + consumers.foreach(_.close()) } +} /** * Initialize the graph writers,