mirror of
https://github.com/twitter/the-algorithm.git
synced 2025-01-07 01:48:16 +01:00
Compare commits
3 Commits
b8386bc765
...
9ddf21a2e4
Author | SHA1 | Date | |
---|---|---|---|
|
9ddf21a2e4 | ||
|
f5b7e91faf | ||
|
5373e80e7c |
@ -134,7 +134,7 @@ public class RetrieveCardBatchedStage extends TwitterBaseStage
|
||||
}));
|
||||
}
|
||||
|
||||
private Future<Map<Long, Card2>> batchRetrieveURLs(Set<Long> keys) {
|
||||
private Future<Map<Long, Card2>> batchRetrieveURLs(List<Long> keys) {
|
||||
retrieveCardsTimer.start();
|
||||
totalTweets.increment(keys.size());
|
||||
|
||||
@ -145,7 +145,7 @@ public class RetrieveCardBatchedStage extends TwitterBaseStage
|
||||
|
||||
GetTweetsRequest request = new GetTweetsRequest()
|
||||
.setOptions(options)
|
||||
.setTweet_ids(new ArrayList<>(keys));
|
||||
.setTweet_ids(keys);
|
||||
|
||||
return tweetyPieService.get_tweets(request)
|
||||
.onFailure(throwable -> {
|
||||
|
@ -2,7 +2,7 @@ package com.twitter.search.ingester.pipeline.util;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
@ -20,7 +20,7 @@ public class BatchingClient<RQ, RP> {
|
||||
/**
|
||||
* Issue a request to the underlying store which supports batches of requests.
|
||||
*/
|
||||
Future<Map<RQ, RP>> batchGet(Set<RQ> requests);
|
||||
Future<Map<RQ, RP>> batchGet(List<RQ> requests);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -54,7 +54,7 @@ public class BatchingClient<RQ, RP> {
|
||||
}
|
||||
|
||||
private void maybeBatchCall(RQ request) {
|
||||
Set<RQ> frozenRequests;
|
||||
List<RQ> frozenRequests;
|
||||
synchronized (unsentRequests) {
|
||||
unsentRequests.add(request);
|
||||
if (unsentRequests.size() < batchSize) {
|
||||
@ -63,27 +63,25 @@ public class BatchingClient<RQ, RP> {
|
||||
|
||||
// Make a copy of requests so we can modify it inside executeBatchCall without additional
|
||||
// synchronization.
|
||||
frozenRequests = new HashSet<>(unsentRequests);
|
||||
frozenRequests = Arrays.asList((RQ[]) unsentRequests.toArray());
|
||||
unsentRequests.clear();
|
||||
}
|
||||
|
||||
executeBatchCall(frozenRequests);
|
||||
}
|
||||
|
||||
private void executeBatchCall(Set<RQ> requests) {
|
||||
private void executeBatchCall(List<RQ> requests) {
|
||||
batchClient.batchGet(requests)
|
||||
.onSuccess(responseMap -> {
|
||||
for (Map.Entry<RQ, RP> entry : responseMap.entrySet()) {
|
||||
Promise<RP> promise = promises.remove(entry.getKey());
|
||||
if (promise != null) {
|
||||
promise.become(Future.value(entry.getValue()));
|
||||
}
|
||||
}
|
||||
|
||||
Set<RQ> outstandingRequests = Sets.difference(requests, responseMap.keySet());
|
||||
for (RQ request : outstandingRequests) {
|
||||
for (RQ request : requests) {
|
||||
Promise<RP> promise = promises.remove(request);
|
||||
if (promise != null) {
|
||||
if (promise == null) {
|
||||
continue;
|
||||
}
|
||||
RP response = responseMap.get(request);
|
||||
if (response != null) {
|
||||
promise.become(Future.value(response));
|
||||
} else {
|
||||
promise.become(Future.exception(new ResponseNotReturnedException(request)));
|
||||
}
|
||||
}
|
||||
|
@ -2,6 +2,6 @@ package com.twitter.search.ingester.pipeline.util;
|
||||
|
||||
public class ResponseNotReturnedException extends Exception {
|
||||
ResponseNotReturnedException(Object request) {
|
||||
super("Response not returned in batch for request: " + request);
|
||||
super("Response not returned in batch for request: " + request, null, false, false);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user