Compare commits

...

3 Commits

Author SHA1 Message Date
maik101010
2a8d8f8857
Merge 6c8a2cd9ad into fb54d8b549 2023-05-22 17:38:53 -05:00
michaelg
6c8a2cd9ad update: removed unused variable 2023-04-01 15:48:59 +01:00
michaelg
9bc72e82b2 update: removed unused variable 2023-04-01 15:45:02 +01:00

View File

@ -106,8 +106,6 @@ public class FeatureUpdateController implements FeatureUpdateService.ServiceIfac
@Override
public Future<FeatureUpdateResponse> process(FeatureUpdateRequest featureUpdate) {
long requestStartTimeMillis = clock.nowMillis();
// Export overall and per-client request rate stats
final String requestClientId;
if (featureUpdate.getRequestClientId() != null
@ -130,7 +128,7 @@ public class FeatureUpdateController implements FeatureUpdateService.ServiceIfac
}
ThriftIndexingEvent event = featureUpdate.getEvent();
return writeToKafka(event, requestStartTimeMillis)
return writeToKafka(event)
.map(responsesList -> {
stats.clientResponse(requestClientId, FeatureUpdateResponseCode.SUCCESS);
// only when both Realtime & RealtimeCG succeed, then it will return a success flag
@ -162,8 +160,7 @@ public class FeatureUpdateController implements FeatureUpdateService.ServiceIfac
* The FeatureUpdateResponse is more like an ACK message, and the upstream (feature update ingester)
* will not be affected much even if it failed (as long as the kafka message is written)
*/
private Future<List<BoxedUnit>> writeToKafka(ThriftIndexingEvent event,
long requestStartTimeMillis) {
private Future<List<BoxedUnit>> writeToKafka(ThriftIndexingEvent event) {
return Futures.collect(Lists.newArrayList(
writeToKafkaInternal(event, WRITE_TO_KAFKA_DECIDER_KEY, droppedKafkaUpdateEvents,
kafkaUpdateEventsTopicName, -1, kafkaProducer),