Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reduced response header copying #8760

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,10 @@ public class RxDocumentServiceResponse {
private final StoreResponse storeResponse;

public RxDocumentServiceResponse(StoreResponse response) {
String[] headerNames = response.getResponseHeaderNames();
String[] headerValues = response.getResponseHeaderValues();

this.headersMap = new HashMap<>(headerNames.length);
this.headersMap = response.getResponseHeaders();

// Gets status code.
this.statusCode = response.getStatus();

// Extracts headers.
for (int i = 0; i < headerNames.length; i++) {
this.headersMap.put(headerNames[i], headerValues[i]);
}

this.storeResponse = response;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ private Mono<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<HttpRespo

// transforms to Observable<StoreResponse>
StoreResponse rsp = new StoreResponse(httpResponseStatus,
HttpUtils.unescape(httpResponseHeaders.toMap().entrySet()),
HttpUtils.unescape(httpResponseHeaders.toMap()),
content);
DirectBridgeInternal.setRequestTimeline(rsp, reactorNettyRequestRecord.takeTimelineSnapshot());
if (request.requestContext.cosmosResponseDiagnostics != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,15 @@ public static String getDateHeader(Map<String, String> headerValues) {
return date != null ? date : StringUtils.EMPTY;
}

public static List<Entry<String, String>> unescape(Set<Entry<String, String>> headers) {
List<Entry<String, String>> result = new ArrayList<>(headers.size());
for (Entry<String, String> entry : headers) {
if (entry.getKey().equals(HttpConstants.HttpHeaders.OWNER_FULL_NAME)) {
String unescapedUrl = HttpUtils.urlDecode(entry.getValue());
entry = new AbstractMap.SimpleEntry<>(entry.getKey(), unescapedUrl);
public static Map<String, String> unescape(Map<String, String> headers) {
Map<String, String> result = new HashMap<>(headers.size());
for (Entry<String, String> entry : headers.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
if (key.equals(HttpConstants.HttpHeaders.OWNER_FULL_NAME)) {
value = HttpUtils.urlDecode(value);
}
result.add(entry);
result.put(key, value);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ static Mono<StoreResponse> toStoreResponse(HttpResponse httpClientResponse, Http

return contentObservable.map(byteArrayContent -> {
// transforms to Mono<StoreResponse>
return new StoreResponse(httpClientResponse.statusCode(), HttpUtils.unescape(httpResponseHeaders.toMap().entrySet()), byteArrayContent);
return new StoreResponse(httpClientResponse.statusCode(), HttpUtils.unescape(httpResponseHeaders.toMap()), byteArrayContent);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,17 +128,8 @@ private void handleUnsuccessfulStoreResponse(RxDocumentServiceRequest request, C
private RxDocumentServiceResponse completeResponse(
StoreResponse storeResponse,
RxDocumentServiceRequest request) throws InternalServerErrorException {
if (storeResponse.getResponseHeaderNames().length != storeResponse.getResponseHeaderValues().length) {
throw new InternalServerErrorException(RMResources.InvalidBackendResponse);
}

Map<String, String> headers = new HashMap<>(storeResponse.getResponseHeaderNames().length);
for (int idx = 0; idx < storeResponse.getResponseHeaderNames().length; idx++) {
String name = storeResponse.getResponseHeaderNames()[idx];
String value = storeResponse.getResponseHeaderValues()[idx];

headers.put(name, value);
}
Map<String, String> headers = new HashMap<>(storeResponse.getResponseHeaders());

this.updateResponseHeader(request, headers);
this.captureSessionToken(request, headers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,40 +10,27 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map.Entry;
import java.util.Map;

/**
* Used internally to represents a response from the store.
*/
public class StoreResponse {
final static Logger LOGGER = LoggerFactory.getLogger(StoreResponse.class);
final private int status;
final private String[] responseHeaderNames;
final private String[] responseHeaderValues;
final private Map<String, String> responseHeaders;
final private byte[] content;

private CosmosResponseDiagnostics cosmosResponseDiagnostics;
private RequestTimeline requestTimeline;

public StoreResponse(
int status,
List<Entry<String, String>> headerEntries,
Map<String, String> headerEntries,
byte[] content) {

requestTimeline = RequestTimeline.empty();
responseHeaderNames = new String[headerEntries.size()];
responseHeaderValues = new String[headerEntries.size()];

int i = 0;

for(Entry<String, String> headerEntry: headerEntries) {
responseHeaderNames[i] = headerEntry.getKey();
responseHeaderValues[i] = headerEntry.getValue();
i++;
}
responseHeaders = headerEntries;

this.status = status;
this.content = content;
Expand All @@ -53,12 +40,8 @@ public int getStatus() {
return status;
}

public String[] getResponseHeaderNames() {
return responseHeaderNames;
}

public String[] getResponseHeaderValues() {
return responseHeaderValues;
public Map<String, String> getResponseHeaders() {
return this.responseHeaders;
}

public byte[] getResponseBody() {
Expand All @@ -82,18 +65,16 @@ public String getContinuation() {
return this.getHeaderValue(HttpConstants.HttpHeaders.CONTINUATION);
}

public String getHeaderValue(String attribute) {
if (this.responseHeaderValues == null || this.responseHeaderNames.length != this.responseHeaderValues.length) {
return null;
}
public boolean containsHeader(String key) {
return this.responseHeaders != null && this.responseHeaders.containsKey(key);
}

for (int i = 0; i < responseHeaderNames.length; i++) {
if (responseHeaderNames[i].equalsIgnoreCase(attribute)) {
return responseHeaderValues[i];
}
}
public String setHeaderValue(String key, String value) {
return this.responseHeaders.put(key, value);
}

return null;
public String getHeaderValue(String attribute) {
return this.responseHeaders.get(attribute);
}

public CosmosResponseDiagnostics getCosmosResponseDiagnostics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,9 @@ private static void setRequestCharge(StoreResponse response, CosmosClientExcepti
Double.toString(totalRequestCharge));
}
// Set total charge as final charge for the response.
else if (response.getResponseHeaderNames() != null) {
for (int i = 0; i < response.getResponseHeaderNames().length; ++i) {
if (Strings.areEqualIgnoreCase(
response.getResponseHeaderNames()[i],
HttpConstants.HttpHeaders.REQUEST_CHARGE)) {
response.getResponseHeaderValues()[i] = Double.toString(totalRequestCharge);
break;
}
else {
if (response.containsHeader(HttpConstants.HttpHeaders.REQUEST_CHARGE)) {
response.setHeaderValue(HttpConstants.HttpHeaders.REQUEST_CHARGE, Double.toString(totalRequestCharge));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,14 +291,11 @@ static RntbdResponse decode(final ByteBuf in) {
}

StoreResponse toStoreResponse(final RntbdContext context) {



checkNotNull(context, "context");

return new StoreResponse(
this.getStatus().code(),
this.headers.asList(context, this.getActivityId()),
this.headers.asMap(context, this.getActivityId()),
toByteArray(this.content)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.CorruptedFrameException;

import java.math.BigDecimal;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
Expand Down Expand Up @@ -209,17 +209,18 @@ List<Map.Entry<String, String>> asList(final RntbdContext context, final UUID ac

public Map<String, String> asMap(final RntbdContext context, final UUID activityId) {

final ImmutableMap.Builder<String, String> builder = ImmutableMap.builderWithExpectedSize(this.computeCount() + 2);
builder.put(new Entry(HttpHeaders.SERVER_VERSION, context.serverVersion()));
builder.put(new Entry(HttpHeaders.ACTIVITY_ID, activityId.toString()));
final HashMap<String, String> headersMap = new HashMap<>(this.computeCount() + 2);
headersMap.put(HttpHeaders.SERVER_VERSION, context.serverVersion());
headersMap.put(HttpHeaders.ACTIVITY_ID, activityId.toString());

this.collectEntries((token, toEntry) -> {
if (token.isPresent()) {
builder.put(toEntry.apply(token));
Map.Entry<String, String> entry = toEntry.apply(token);
headersMap.put(entry.getKey(), entry.getValue());
}
});

return builder.build();
return headersMap;
}

static RntbdResponseHeaders decode(final ByteBuf in) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -52,7 +53,7 @@ public void backoffRetryUtilityExecuteRetry() throws Exception {
retryPolicy = new TestRetryPolicy();
CosmosClientException exception = new CosmosClientException(410, exceptionText);
Mockito.when(callbackMethod.call()).thenThrow(exception, exception, exception, exception, exception)
.thenReturn(Mono.just(new StoreResponse(200, new ArrayList<>(), getUTF8BytesOrNull(responseText))));
.thenReturn(Mono.just(new StoreResponse(200, new HashMap<>(), getUTF8BytesOrNull(responseText))));
Mono<StoreResponse> monoResponse = BackoffRetryUtility.executeRetry(callbackMethod, retryPolicy);
StoreResponse response = validateSuccess(monoResponse);

Expand Down Expand Up @@ -88,7 +89,7 @@ public void backoffRetryUtilityExecuteAsync() {
CosmosClientException exception = new CosmosClientException(410, exceptionText);
Mono<CosmosClientException> exceptionMono = Mono.error(exception);
Mockito.when(parameterizedCallbackMethod.apply(Matchers.any(Quadruple.class))).thenReturn(exceptionMono, exceptionMono, exceptionMono, exceptionMono, exceptionMono)
.thenReturn(Mono.just(new StoreResponse(200, new ArrayList<>(), getUTF8BytesOrNull(responseText))));
.thenReturn(Mono.just(new StoreResponse(200, new HashMap<>(), getUTF8BytesOrNull(responseText))));
Mono<StoreResponse> monoResponse = BackoffRetryUtility.executeAsync(parameterizedCallbackMethod, retryPolicy, inBackoffAlternateCallbackMethod, Duration.ofSeconds(5), serviceRequest);
StoreResponse response = validateSuccess(monoResponse);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,32 @@
import java.math.BigDecimal;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static com.azure.cosmos.implementation.Utils.getUTF8BytesOrNull;

public class StoreResponseBuilder {
private int status;
private List<Map.Entry<String, String>> headerEntries;
private Map<String, String> headerEntries;
private String content;

public static StoreResponseBuilder create() {
return new StoreResponseBuilder();
}

public StoreResponseBuilder() {
headerEntries = new ArrayList<>();
headerEntries = new HashMap<>();
}

public StoreResponseBuilder withHeader(String key, String value) {
headerEntries.add(new AbstractMap.SimpleEntry(key, value));
headerEntries.put(key, value);
return this;
}

public StoreResponseBuilder withLSN(long lsn) {
headerEntries.add(new AbstractMap.SimpleEntry(WFConstants.BackendHeaders.LSN, Long.toString(lsn)));
headerEntries.put(WFConstants.BackendHeaders.LSN, Long.toString(lsn));
return this;
}

Expand All @@ -43,42 +44,42 @@ public StoreResponseBuilder withRequestCharge(BigDecimal requestCharge) {
}

public StoreResponseBuilder withRequestCharge(double requestCharge) {
headerEntries.add(new AbstractMap.SimpleEntry(HttpConstants.HttpHeaders.REQUEST_CHARGE, Double.toString(requestCharge)));
headerEntries.put(HttpConstants.HttpHeaders.REQUEST_CHARGE, Double.toString(requestCharge));
return this;
}

public StoreResponseBuilder withLocalLSN(long localLsn) {
headerEntries.add(new AbstractMap.SimpleEntry(WFConstants.BackendHeaders.LOCAL_LSN, Long.toString(localLsn)));
headerEntries.put(WFConstants.BackendHeaders.LOCAL_LSN, Long.toString(localLsn));
return this;
}

public StoreResponseBuilder withPartitionKeyRangeId(String partitionKeyRangeId) {
headerEntries.add(new AbstractMap.SimpleEntry(WFConstants.BackendHeaders.PARTITION_KEY_RANGE_ID, partitionKeyRangeId));
headerEntries.put(WFConstants.BackendHeaders.PARTITION_KEY_RANGE_ID, partitionKeyRangeId);
return this;
}

public StoreResponseBuilder withItemLocalLSN(long itemLocalLsn) {
headerEntries.add(new AbstractMap.SimpleEntry(WFConstants.BackendHeaders.ITEM_LOCAL_LSN, Long.toString(itemLocalLsn)));
headerEntries.put(WFConstants.BackendHeaders.ITEM_LOCAL_LSN, Long.toString(itemLocalLsn));
return this;
}

public StoreResponseBuilder withQuorumAckecdLsn(long quorumAckecdLsn) {
headerEntries.add(new AbstractMap.SimpleEntry(WFConstants.BackendHeaders.QUORUM_ACKED_LSN, Long.toString(quorumAckecdLsn)));
headerEntries.put(WFConstants.BackendHeaders.QUORUM_ACKED_LSN, Long.toString(quorumAckecdLsn));
return this;
}

public StoreResponseBuilder withQuorumAckecdLocalLsn(long quorumAckecdLocalLsn) {
headerEntries.add(new AbstractMap.SimpleEntry(WFConstants.BackendHeaders.QUORUM_ACKED_LOCAL_LSN, Long.toString(quorumAckecdLocalLsn)));
headerEntries.put(WFConstants.BackendHeaders.QUORUM_ACKED_LOCAL_LSN, Long.toString(quorumAckecdLocalLsn));
return this;
}

public StoreResponseBuilder withGlobalCommittedLsn(long globalCommittedLsn) {
headerEntries.add(new AbstractMap.SimpleEntry(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, Long.toString(globalCommittedLsn)));
headerEntries.put(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, Long.toString(globalCommittedLsn));
return this;
}

public StoreResponseBuilder withSessionToken(String sessionToken) {
headerEntries.add(new AbstractMap.SimpleEntry(HttpConstants.HttpHeaders.SESSION_TOKEN, sessionToken));
headerEntries.put(HttpConstants.HttpHeaders.SESSION_TOKEN, sessionToken);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.azure.cosmos.implementation.StoreResponseBuilder;
import com.azure.cosmos.implementation.Utils;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.reactivex.subscribers.TestSubscriber;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
Expand Down Expand Up @@ -145,10 +146,12 @@ public void run() {

@Test(groups = "unit")
public void getLsnAndGlobalCommittedLsn() {
ImmutableList.Builder<Map.Entry<String, String>> builder = new ImmutableList.Builder<>();
builder.add(new AbstractMap.SimpleEntry<>(WFConstants.BackendHeaders.LSN, "3"));
builder.add(new AbstractMap.SimpleEntry<>(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, "2"));
ImmutableList<Map.Entry<String, String>> headers = builder.build();

ImmutableMap.Builder<String, String> builder = new ImmutableMap.Builder<>();

builder.put(WFConstants.BackendHeaders.LSN, "3");
builder.put(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, "2");
ImmutableMap<String, String> headers = builder.build();

StoreResponse sr = new StoreResponse(0, headers, null);
Utils.ValueHolder<Long> lsn = Utils.ValueHolder.initialize(-2l);
Expand Down
Loading