Skip to content

Commit

Permalink
Container Throughput: Fixes the throughput APIs to handle stale cache (
Browse files Browse the repository at this point in the history
…#1914)

* Fixed offer operation when container cache is stale

* Updated exception info

* Added missing database cleanup on tests.

* Fixed missing diagnostics and headers
  • Loading branch information
j82w committed Oct 8, 2020
1 parent 3158d18 commit f0e7326
Show file tree
Hide file tree
Showing 2 changed files with 235 additions and 30 deletions.
112 changes: 87 additions & 25 deletions Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace Microsoft.Azure.Cosmos
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.ChangeFeed;
Expand Down Expand Up @@ -121,51 +122,62 @@ public async Task<ThroughputResponse> ReadThroughputAsync(
RequestOptions requestOptions,
CancellationToken cancellationToken = default)
{
string rid = await this.GetRIDAsync(cancellationToken);
CosmosOffers cosmosOffers = new CosmosOffers(this.ClientContext);
return await cosmosOffers.ReadThroughputAsync(rid, requestOptions, cancellationToken);
ThroughputResponse throughputResponse = await this.ReadThroughputIfExistsAsync(
diagnosticsContext,
requestOptions,
cancellationToken);

if (throughputResponse.StatusCode == HttpStatusCode.NotFound)
{
throw CosmosExceptionFactory.CreateNotFoundException(
message: $"Throughput is not configured for {this.Id}",
headers: throughputResponse.Headers,
diagnosticsContext: diagnosticsContext);
}

return throughputResponse;
}

public async Task<ThroughputResponse> ReadThroughputIfExistsAsync(
public Task<ThroughputResponse> ReadThroughputIfExistsAsync(
CosmosDiagnosticsContext diagnosticsContext,
RequestOptions requestOptions,
CancellationToken cancellationToken = default)
{
string rid = await this.GetRIDAsync(cancellationToken);
CosmosOffers cosmosOffers = new CosmosOffers(this.ClientContext);
return await cosmosOffers.ReadThroughputIfExistsAsync(rid, requestOptions, cancellationToken);
return this.OfferRetryHelperForStaleRidCacheAsync(
(rid) => cosmosOffers.ReadThroughputIfExistsAsync(rid, requestOptions, cancellationToken),
diagnosticsContext,
cancellationToken);
}

public async Task<ThroughputResponse> ReplaceThroughputAsync(
public Task<ThroughputResponse> ReplaceThroughputAsync(
CosmosDiagnosticsContext diagnosticsContext,
int throughput,
RequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
string rid = await this.GetRIDAsync(cancellationToken);

CosmosOffers cosmosOffers = new CosmosOffers(this.ClientContext);
return await cosmosOffers.ReplaceThroughputAsync(
targetRID: rid,
throughput: throughput,
return this.ReplaceThroughputAsync(
diagnosticsContext: diagnosticsContext,
throughputProperties: ThroughputProperties.CreateManualThroughput(throughput),
requestOptions: requestOptions,
cancellationToken: cancellationToken);
}

public async Task<ThroughputResponse> ReplaceThroughputIfExistsAsync(
public Task<ThroughputResponse> ReplaceThroughputIfExistsAsync(
CosmosDiagnosticsContext diagnosticsContext,
ThroughputProperties throughput,
RequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
string rid = await this.GetRIDAsync(cancellationToken);

CosmosOffers cosmosOffers = new CosmosOffers(this.ClientContext);
return await cosmosOffers.ReplaceThroughputPropertiesIfExistsAsync(
targetRID: rid,
throughputProperties: throughput,
requestOptions: requestOptions,
cancellationToken: cancellationToken);
return this.OfferRetryHelperForStaleRidCacheAsync(
(rid) => cosmosOffers.ReplaceThroughputPropertiesIfExistsAsync(
targetRID: rid,
throughputProperties: throughput,
requestOptions: requestOptions,
cancellationToken: cancellationToken),
diagnosticsContext,
cancellationToken);
}

public async Task<ThroughputResponse> ReplaceThroughputAsync(
Expand All @@ -174,13 +186,21 @@ public async Task<ThroughputResponse> ReplaceThroughputAsync(
RequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
string rid = await this.GetRIDAsync(cancellationToken);
CosmosOffers cosmosOffers = new CosmosOffers(this.ClientContext);
return await cosmosOffers.ReplaceThroughputPropertiesAsync(
rid,
ThroughputResponse throughputResponse = await this.ReplaceThroughputIfExistsAsync(
diagnosticsContext,
throughputProperties,
requestOptions,
cancellationToken);

if (throughputResponse.StatusCode == HttpStatusCode.NotFound)
{
throw CosmosExceptionFactory.CreateNotFoundException(
message: $"Throughput is not configured for {this.Id}",
headers: throughputResponse.Headers,
diagnosticsContext: diagnosticsContext);
}

return throughputResponse;
}

public Task<ResponseMessage> DeleteContainerStreamAsync(
Expand Down Expand Up @@ -392,6 +412,48 @@ public override Task<CollectionRoutingMap> GetRoutingMapAsync(CancellationToken
.Unwrap();
}

private async Task<ThroughputResponse> OfferRetryHelperForStaleRidCacheAsync(
Func<string, Task<ThroughputResponse>> executeOfferOperation,
CosmosDiagnosticsContext diagnosticsContext,
CancellationToken cancellationToken)
{
string rid = await this.GetRIDAsync(cancellationToken);
ThroughputResponse throughputResponse = await executeOfferOperation(rid);
if (throughputResponse.StatusCode != HttpStatusCode.NotFound)
{
return throughputResponse;
}

// Check if RID cache is stale
ResponseMessage responseMessage = await this.ReadContainerStreamAsync(
diagnosticsContext: diagnosticsContext,
requestOptions: null,
cancellationToken: cancellationToken);

// Container does not exist
if (responseMessage.StatusCode == HttpStatusCode.NotFound)
{
return new ThroughputResponse(
responseMessage.StatusCode,
responseMessage.Headers,
null,
diagnosticsContext.Diagnostics);
}

responseMessage.EnsureSuccessStatusCode();

ContainerProperties containerProperties = this.ClientContext.SerializerCore.FromStream<ContainerProperties>(responseMessage.Content);

// The RIDs match so return the original response.
if (string.Equals(rid, containerProperties.ResourceId))
{
return throughputResponse;
}

// Get the offer with the new rid value
return await executeOfferOperation(containerProperties.ResourceId);
}

private Task<ResponseMessage> ReplaceStreamInternalAsync(
CosmosDiagnosticsContext diagnosticsContext,
Stream streamPayload,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
{
using System;
using System.IO;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -33,6 +32,144 @@ public void TestCleanup()
this.cosmosClient.Dispose();
}

[TestMethod]
public async Task NegativeContainerThroughputTestAsync()
{
// Create a database and container to make sure all the caches are warmed up
Database db1 = await this.cosmosClient.CreateDatabaseAsync(
Guid.NewGuid().ToString(),
400);

// Container does not have an offer
Container container = await db1.CreateContainerAsync(
Guid.NewGuid().ToString(),
"/pk");

await container.CreateItemAsync(ToDoActivity.CreateRandomToDoActivity());

try
{
await container.ReadThroughputAsync(requestOptions: null);
Assert.Fail("Should throw exception");
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotFound)
{
Assert.IsTrue(ex.Message.Contains(container.Id));
}

try
{
await container.ReplaceThroughputAsync(400);
Assert.Fail("Should throw exception");
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotFound)
{
Assert.IsTrue(ex.Message.Contains(container.Id));
}

int? throughput = await container.ReadThroughputAsync();
Assert.IsNull(throughput);

{
ThroughputResponse offerAfterRecreate = await ((ContainerInternal)container).ReadThroughputIfExistsAsync(
requestOptions: default,
cancellationToken: default);
Assert.AreEqual(HttpStatusCode.NotFound, offerAfterRecreate.StatusCode);
}

{
ThroughputResponse offerAfterRecreate = await ((ContainerInternal)container).ReplaceThroughputIfExistsAsync(
throughput: ThroughputProperties.CreateManualThroughput(400),
requestOptions: default,
cancellationToken: default);
Assert.AreEqual(HttpStatusCode.NotFound, offerAfterRecreate.StatusCode);
}

await db1.DeleteAsync();
}

[TestMethod]
public async Task ContainerRecreateOfferTestAsync()
{
// Create a database and container to make sure all the caches are warmed up
Database db1 = await this.cosmosClient.CreateDatabaseAsync(
Guid.NewGuid().ToString());
Container container = await db1.CreateContainerAsync(
Guid.NewGuid().ToString(),
"/pk",
400);
await container.CreateItemAsync(ToDoActivity.CreateRandomToDoActivity());

ThroughputResponse offer = await container.ReadThroughputAsync(requestOptions: null);
Assert.AreEqual(400, offer.Resource.Throughput);
ThroughputProperties replaceOffer = await container.ReplaceThroughputAsync(2000);
Assert.AreEqual(2000, replaceOffer.Throughput);

{
// Recreate the container with the same name using a different client
await this.RecreateContainerUsingDifferentClient(db1.Id, container.Id, 3000);

ThroughputProperties offerAfterRecreate = await container.ReplaceThroughputAsync(400);
Assert.AreEqual(400, offerAfterRecreate.Throughput);
}

{
// Recreate the container with the same name using a different client
await this.RecreateContainerUsingDifferentClient(db1.Id, container.Id, 3000);

ThroughputProperties offerAfterRecreate = await container.ReadThroughputAsync(requestOptions: null);
Assert.AreEqual(3000, offerAfterRecreate.Throughput);
}

{
// Recreate the container with the same name using a different client
await this.RecreateContainerUsingDifferentClient(db1.Id, container.Id, 3000);

int? throughput = await container.ReadThroughputAsync();
Assert.AreEqual(3000, throughput.Value);
}

{
// Recreate the container with the same name using a different client
await this.RecreateContainerUsingDifferentClient(db1.Id, container.Id, 3000);

ThroughputProperties offerAfterRecreate = await ((ContainerInternal)container).ReadThroughputIfExistsAsync(
requestOptions: default,
cancellationToken: default);
Assert.AreEqual(3000, offerAfterRecreate.Throughput);
}

{
// Recreate the container with the same name using a different client
await this.RecreateContainerUsingDifferentClient(db1.Id, container.Id, 3000);

ThroughputProperties offerAfterRecreate = await ((ContainerInternal)container).ReplaceThroughputIfExistsAsync(
throughput: ThroughputProperties.CreateManualThroughput(400),
requestOptions: default,
cancellationToken: default);

Assert.AreEqual(400, offerAfterRecreate.Throughput);
}

await db1.DeleteAsync();
}

private async Task RecreateContainerUsingDifferentClient(
string databaseId,
string containerId,
int throughput)
{
// Recreate the database with the same name using a different client
using (CosmosClient tempClient = TestCommon.CreateCosmosClient())
{
Database db = tempClient.GetDatabase(databaseId);
Container temp = db.GetContainer(containerId);
await temp.DeleteContainerAsync();
Container db1Container = await db.CreateContainerAsync(containerId, "/pk", throughput);
await db1Container.CreateItemAsync(ToDoActivity.CreateRandomToDoActivity());
}
}

[TestMethod]
public async Task CreateDropAutoscaleDatabaseStreamApi()
{
Expand Down Expand Up @@ -167,15 +304,15 @@ public async Task ContainerAutoscaleIfExistsTest()

throughputResponse = await containerCore.ReadThroughputIfExistsAsync(
requestOptions: null,
default(CancellationToken));
default);
Assert.IsNotNull(throughputResponse);
Assert.IsTrue(throughputResponse.Resource.Throughput > 400);
Assert.AreEqual(5000, throughputResponse.Resource.AutoscaleMaxThroughput);

throughputResponse = await containerCore.ReplaceThroughputIfExistsAsync(
ThroughputProperties.CreateAutoscaleThroughput(6000),
requestOptions: null,
default(CancellationToken));
default);
Assert.IsNotNull(throughputResponse);
Assert.IsTrue(throughputResponse.Resource.Throughput > 400);
Assert.AreEqual(6000, throughputResponse.Resource.AutoscaleMaxThroughput);
Expand Down Expand Up @@ -274,6 +411,8 @@ public async Task CreateDatabaseIfNotExistTest()
Assert.IsNotNull(autoscale);
Assert.IsNotNull(autoscale.Resource.Throughput);
Assert.AreEqual(5000, autoscale.Resource.AutoscaleMaxThroughput);

await databaseResponse.Database.DeleteAsync();
}

[TestMethod]
Expand All @@ -297,6 +436,8 @@ public async Task CreateContainerIfNotExistTest()
containerProperties,
ThroughputProperties.CreateAutoscaleThroughput(5000));
Assert.AreEqual(HttpStatusCode.OK, containerResponse.StatusCode);

await database.DeleteAsync();
}

[TestMethod]
Expand Down Expand Up @@ -354,10 +495,12 @@ public async Task CreateDropAutoscaleContainerStreamApi()
ContainerInternal streamContainer = (ContainerInlineCore)database.GetContainer(streamContainerId);
ThroughputResponse autoscaleIfExists = await streamContainer.ReadThroughputIfExistsAsync(
requestOptions: null,
default(CancellationToken));
default);
Assert.IsNotNull(autoscaleIfExists);
Assert.AreEqual(5000, autoscaleIfExists.Resource.AutoscaleMaxThroughput);
}

await database.DeleteAsync();
}

[TestMethod]
Expand All @@ -378,7 +521,7 @@ public async Task CreateDropAutoscaleContainer()
throughputResponse = await container.ReplaceThroughputAsync(
ThroughputProperties.CreateAutoscaleThroughput(6000),
requestOptions: null,
cancellationToken: default(CancellationToken));
cancellationToken: default);

Assert.IsNotNull(throughputResponse);
Assert.AreEqual(6000, throughputResponse.Resource.AutoscaleMaxThroughput);
Expand Down

0 comments on commit f0e7326

Please sign in to comment.