mirror of
https://github.com/twitter/the-algorithm.git
synced 2025-01-05 09:01:54 +01:00
Merge f5b7e91faf
into 72eda9a24f
This commit is contained in:
commit
b8386bc765
@ -134,7 +134,7 @@ public class RetrieveCardBatchedStage extends TwitterBaseStage
|
|||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
private Future<Map<Long, Card2>> batchRetrieveURLs(Set<Long> keys) {
|
private Future<Map<Long, Card2>> batchRetrieveURLs(List<Long> keys) {
|
||||||
retrieveCardsTimer.start();
|
retrieveCardsTimer.start();
|
||||||
totalTweets.increment(keys.size());
|
totalTweets.increment(keys.size());
|
||||||
|
|
||||||
@ -145,7 +145,7 @@ public class RetrieveCardBatchedStage extends TwitterBaseStage
|
|||||||
|
|
||||||
GetTweetsRequest request = new GetTweetsRequest()
|
GetTweetsRequest request = new GetTweetsRequest()
|
||||||
.setOptions(options)
|
.setOptions(options)
|
||||||
.setTweet_ids(new ArrayList<>(keys));
|
.setTweet_ids(keys);
|
||||||
|
|
||||||
return tweetyPieService.get_tweets(request)
|
return tweetyPieService.get_tweets(request)
|
||||||
.onFailure(throwable -> {
|
.onFailure(throwable -> {
|
||||||
|
@ -2,7 +2,7 @@ package com.twitter.search.ingester.pipeline.util;
|
|||||||
|
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.List;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
@ -20,7 +20,7 @@ public class BatchingClient<RQ, RP> {
|
|||||||
/**
|
/**
|
||||||
* Issue a request to the underlying store which supports batches of requests.
|
* Issue a request to the underlying store which supports batches of requests.
|
||||||
*/
|
*/
|
||||||
Future<Map<RQ, RP>> batchGet(Set<RQ> requests);
|
Future<Map<RQ, RP>> batchGet(List<RQ> requests);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -54,7 +54,7 @@ public class BatchingClient<RQ, RP> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void maybeBatchCall(RQ request) {
|
private void maybeBatchCall(RQ request) {
|
||||||
Set<RQ> frozenRequests;
|
List<RQ> frozenRequests;
|
||||||
synchronized (unsentRequests) {
|
synchronized (unsentRequests) {
|
||||||
unsentRequests.add(request);
|
unsentRequests.add(request);
|
||||||
if (unsentRequests.size() < batchSize) {
|
if (unsentRequests.size() < batchSize) {
|
||||||
@ -63,27 +63,25 @@ public class BatchingClient<RQ, RP> {
|
|||||||
|
|
||||||
// Make a copy of requests so we can modify it inside executeBatchCall without additional
|
// Make a copy of requests so we can modify it inside executeBatchCall without additional
|
||||||
// synchronization.
|
// synchronization.
|
||||||
frozenRequests = new HashSet<>(unsentRequests);
|
frozenRequests = Arrays.asList((RQ[]) unsentRequests.toArray());
|
||||||
unsentRequests.clear();
|
unsentRequests.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
executeBatchCall(frozenRequests);
|
executeBatchCall(frozenRequests);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void executeBatchCall(Set<RQ> requests) {
|
private void executeBatchCall(List<RQ> requests) {
|
||||||
batchClient.batchGet(requests)
|
batchClient.batchGet(requests)
|
||||||
.onSuccess(responseMap -> {
|
.onSuccess(responseMap -> {
|
||||||
for (Map.Entry<RQ, RP> entry : responseMap.entrySet()) {
|
for (RQ request : requests) {
|
||||||
Promise<RP> promise = promises.remove(entry.getKey());
|
|
||||||
if (promise != null) {
|
|
||||||
promise.become(Future.value(entry.getValue()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Set<RQ> outstandingRequests = Sets.difference(requests, responseMap.keySet());
|
|
||||||
for (RQ request : outstandingRequests) {
|
|
||||||
Promise<RP> promise = promises.remove(request);
|
Promise<RP> promise = promises.remove(request);
|
||||||
if (promise != null) {
|
if (promise == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
RP response = responseMap.get(request);
|
||||||
|
if (response != null) {
|
||||||
|
promise.become(Future.value(response));
|
||||||
|
} else {
|
||||||
promise.become(Future.exception(new ResponseNotReturnedException(request)));
|
promise.become(Future.exception(new ResponseNotReturnedException(request)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2,6 +2,6 @@ package com.twitter.search.ingester.pipeline.util;
|
|||||||
|
|
||||||
public class ResponseNotReturnedException extends Exception {
|
public class ResponseNotReturnedException extends Exception {
|
||||||
ResponseNotReturnedException(Object request) {
|
ResponseNotReturnedException(Object request) {
|
||||||
super("Response not returned in batch for request: " + request);
|
super("Response not returned in batch for request: " + request, null, false, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user