-
Notifications
You must be signed in to change notification settings - Fork 3.1k
/
VectorStore_DataIngestion_CustomMapper.cs
204 lines (180 loc) · 9.63 KB
/
VectorStore_DataIngestion_CustomMapper.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
// Copyright (c) Microsoft. All rights reserved.
using System.Text.Json;
using System.Text.Json.Nodes;
using Memory.VectorStoreFixtures;
using Microsoft.SemanticKernel.Connectors.AzureOpenAI;
using Microsoft.SemanticKernel.Connectors.Redis;
using Microsoft.SemanticKernel.Data;
using Microsoft.SemanticKernel.Embeddings;
using StackExchange.Redis;
namespace Memory;
/// <summary>
/// An example showing how to ingest data into a vector store using <see cref="RedisVectorStore"/> with a custom mapper.
/// In this example, the storage model differs significantly from the data model, so a custom mapper is used to map between the two.
/// A <see cref="VectorStoreRecordDefinition"/> is used to define the schema of the storage model, and this means that the connector
/// will not try and infer the schema from the data model.
/// In storage the data is stored as a JSON object that looks similar to this:
/// <code>
/// {
/// "Term": "API",
/// "Definition": "Application Programming Interface. A set of rules and specifications that allow software components to communicate and exchange data.",
/// "DefinitionEmbedding": [ ... ]
/// }
/// </code>
/// However, the data model is a class with a property for key and two dictionaries for the data (Term and Definition) and vector (DefinitionEmbedding).
///
/// The example shows the following steps:
/// 1. Create an embedding generator.
/// 2. Create a Redis Vector Store using a custom factory for creating collections.
/// When constructing a collection, the factory injects a custom mapper that maps between the data model and the storage model if required.
/// 3. Ingest some data into the vector store.
/// 4. Read the data back from the vector store.
///
/// You need a local instance of Docker running, since the associated fixture will try and start a Redis container in the local docker instance to run against.
/// </summary>
public class VectorStore_DataIngestion_CustomMapper(ITestOutputHelper output, VectorStoreRedisContainerFixture redisFixture) : BaseTest(output), IClassFixture<VectorStoreRedisContainerFixture>
{
/// <summary>
/// A record definition for the glossary entries that defines the storage schema of the record.
/// </summary>
private static readonly VectorStoreRecordDefinition s_glossaryDefinition = new()
{
Properties = new List<VectorStoreRecordProperty>
{
new VectorStoreRecordKeyProperty("Key", typeof(string)),
new VectorStoreRecordDataProperty("Term", typeof(string)),
new VectorStoreRecordDataProperty("Definition", typeof(string)),
new VectorStoreRecordVectorProperty("DefinitionEmbedding", typeof(ReadOnlyMemory<float>)) { Dimensions = 1536, DistanceFunction = DistanceFunction.DotProductSimilarity }
}
};
[Fact]
public async Task ExampleAsync()
{
// Create an embedding generation service.
var textEmbeddingGenerationService = new AzureOpenAITextEmbeddingGenerationService(
TestConfiguration.AzureOpenAIEmbeddings.DeploymentName,
TestConfiguration.AzureOpenAIEmbeddings.Endpoint,
TestConfiguration.AzureOpenAIEmbeddings.ApiKey);
// Initiate the docker container and construct the vector store using the custom factory for creating collections.
await redisFixture.ManualInitializeAsync();
ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("localhost:6379");
var vectorStore = new RedisVectorStore(redis.GetDatabase(), new() { VectorStoreCollectionFactory = new Factory() });
// Get and create collection if it doesn't exist, using the record definition containing the storage model.
var collection = vectorStore.GetCollection<string, GenericDataModel>("skglossary", s_glossaryDefinition);
await collection.CreateCollectionIfNotExistsAsync();
// Create glossary entries and generate embeddings for them.
var glossaryEntries = CreateGlossaryEntries().ToList();
var tasks = glossaryEntries.Select(entry => Task.Run(async () =>
{
entry.Vectors["DefinitionEmbedding"] = await textEmbeddingGenerationService.GenerateEmbeddingAsync((string)entry.Data["Definition"]);
}));
await Task.WhenAll(tasks);
// Upsert the glossary entries into the collection and return their keys.
var upsertedKeysTasks = glossaryEntries.Select(x => collection.UpsertAsync(x));
var upsertedKeys = await Task.WhenAll(upsertedKeysTasks);
// Retrieve one of the upserted records from the collection.
var upsertedRecord = await collection.GetAsync(upsertedKeys.First(), new() { IncludeVectors = true });
// Write upserted keys and one of the upserted records to the console.
Console.WriteLine($"Upserted keys: {string.Join(", ", upsertedKeys)}");
Console.WriteLine($"Upserted record: {JsonSerializer.Serialize(upsertedRecord)}");
}
/// <summary>
/// A custom mapper that maps between the data model and the storage model.
/// </summary>
private sealed class Mapper : IVectorStoreRecordMapper<GenericDataModel, (string Key, JsonNode Node)>
{
public (string Key, JsonNode Node) MapFromDataToStorageModel(GenericDataModel dataModel)
{
var jsonObject = new JsonObject();
jsonObject.Add("Term", dataModel.Data["Term"].ToString());
jsonObject.Add("Definition", dataModel.Data["Definition"].ToString());
var vector = (ReadOnlyMemory<float>)dataModel.Vectors["DefinitionEmbedding"];
var jsonArray = new JsonArray(vector.ToArray().Select(x => JsonValue.Create(x)).ToArray());
jsonObject.Add("DefinitionEmbedding", jsonArray);
return (dataModel.Key, jsonObject);
}
public GenericDataModel MapFromStorageToDataModel((string Key, JsonNode Node) storageModel, StorageToDataModelMapperOptions options)
{
var dataModel = new GenericDataModel
{
Key = storageModel.Key,
Data = new Dictionary<string, object>
{
{ "Term", (string)storageModel.Node["Term"]! },
{ "Definition", (string)storageModel.Node["Definition"]! }
},
Vectors = new Dictionary<string, object>
{
{ "DefinitionEmbedding", new ReadOnlyMemory<float>(storageModel.Node["DefinitionEmbedding"]!.AsArray().Select(x => (float)x!).ToArray()) }
}
};
return dataModel;
}
}
/// <summary>
/// A factory for creating collections in the vector store
/// </summary>
private sealed class Factory : IRedisVectorStoreRecordCollectionFactory
{
public IVectorStoreRecordCollection<TKey, TRecord> CreateVectorStoreRecordCollection<TKey, TRecord>(IDatabase database, string name, VectorStoreRecordDefinition? vectorStoreRecordDefinition)
where TKey : notnull
where TRecord : class
{
// If the record definition is the glossary definition and the record type is the generic data model, inject the custom mapper into the collection options.
if (vectorStoreRecordDefinition == s_glossaryDefinition && typeof(TRecord) == typeof(GenericDataModel))
{
var customCollection = new RedisJsonVectorStoreRecordCollection<GenericDataModel>(database, name, new() { VectorStoreRecordDefinition = vectorStoreRecordDefinition, JsonNodeCustomMapper = new Mapper() }) as IVectorStoreRecordCollection<TKey, TRecord>;
return customCollection!;
}
// Otherwise, just create a standard collection with the default mapper.
var collection = new RedisJsonVectorStoreRecordCollection<TRecord>(database, name, new() { VectorStoreRecordDefinition = vectorStoreRecordDefinition }) as IVectorStoreRecordCollection<TKey, TRecord>;
return collection!;
}
}
/// <summary>
/// Sample generic data model class that can store any data.
/// </summary>
private sealed class GenericDataModel
{
public string Key { get; set; }
public Dictionary<string, object> Data { get; set; }
public Dictionary<string, object> Vectors { get; set; }
}
/// <summary>
/// Create some sample glossary entries using the generic data model.
/// </summary>
/// <returns>A list of sample glossary entries.</returns>
private static IEnumerable<GenericDataModel> CreateGlossaryEntries()
{
yield return new GenericDataModel
{
Key = "1",
Data = new()
{
{ "Term", "API" },
{ "Definition", "Application Programming Interface. A set of rules and specifications that allow software components to communicate and exchange data." }
},
Vectors = new()
};
yield return new GenericDataModel
{
Key = "2",
Data = new()
{
{ "Term", "Connectors" },
{ "Definition", "Connectors allow you to integrate with various services provide AI capabilities, including LLM, AudioToText, TextToAudio, Embedding generation, etc." }
},
Vectors = new()
};
yield return new GenericDataModel
{
Key = "3",
Data = new()
{
{ "Term", "RAG" },
{ "Definition", "Retrieval Augmented Generation - a term that refers to the process of retrieving additional data to provide as context to an LLM to use when generating a response (completion) to a user’s question (prompt)." }
},
Vectors = new()
};
}
}