BEACHSIDE BLOG

Azure と GitHub と C# が好きなエンジニアの個人メモ ( ・ㅂ・)و ̑̑

Cosmos DB の Bulk Executor を使った一括インポート

ざっくりまとめ:

  • サポートしてる API は、Azure Cosmos DB SQL API と Gremlin API
  • 一括インポート API と一括更新 API
  • SDK v2 だと外部 SDK が必要だが、SDK v3 ではサポートされているので外部ライブラリは不要

Cosmos SDKv3 での Bulk import

実装方法

CosmosClient の初期化時に options で `` をつければ Bulk で実行される。

        var cosmosOptions = new CosmosClientOptions
        {
            AllowBulkExecution = true,
            SerializerOptions = new CosmosSerializationOptions
            {
                PropertyNamingPolicy = CosmosPropertyNamingPolicy.CamelCase,
            }
        };

実装サンプルはこんな感じ。

‘‘‘cs public class BulkImprtSample { private const string ConnectionString = "AccountEndpoint=https://cosmos-beachside-sandbox.documents.azure.com:443/;AccountKey=WeVjCCNtKdl851yBHuy2Vz3xOKXMUAClnVgMEPtS9xcUlFeIiJrbkfsLs93J5PYKY2QqOHxjQkLA8j095Imlqw==;"; private const string DatabaseName = "SampleDb"; private const string ContainerName = "Container2"; private const int AmountotoInsert = 100;

internal async Task RunBulkImportAsync()
{
    var itemsToInsert = Item.GenerateItems(AmountotoInsert);

    var cosmosOptions = new CosmosClientOptions
    {
        AllowBulkExecution = true,
        SerializerOptions = new CosmosSerializationOptions
        {
            PropertyNamingPolicy = CosmosPropertyNamingPolicy.CamelCase,
        }
    };

    var cosmosClient = new CosmosClient(ConnectionString, cosmosOptions);
    var container = cosmosClient.GetContainer(DatabaseName, ContainerName);
    var tasks = new List<Task>(AmountotoInsert);

    var stopwatch = new Stopwatch();
    stopwatch.Start();

    foreach (var item in itemsToInsert)
    {
        tasks.Add(container.UpsertItemAsync(item, new PartitionKey(item.Id))
            .ContinueWith(itemResponse =>
            {
                if (!itemResponse.IsCompletedSuccessfully)
                {
                    AggregateException innerExceptions = itemResponse.Exception.Flatten();
                    if (innerExceptions.InnerExceptions.FirstOrDefault(innerEx => innerEx is CosmosException) is CosmosException cosmosException)
                    {
                        Console.WriteLine($"Received {cosmosException.StatusCode} ({cosmosException.Message}).");
                    }
                    else
                    {
                        Console.WriteLine($"Exception {innerExceptions.InnerExceptions.FirstOrDefault()}.");
                    }
                }
            }));
    }

    await Task.WhenAll(tasks);

    stopwatch.Stop();
    Console.WriteLine($"処理時間: {stopwatch.ElapsedMilliseconds}");

}

}

public class Item { public string Id { get; set; } public string Description { get; set; }

public static IReadOnlyCollection<Item> GenerateItems(int amount)
{
    return new Faker<Item>()
        .StrictMode(true)
        .RuleFor(i => i.Id, f => Guid.NewGuid().ToString())
        .RuleFor(i => i.Description, f => f.Internet.UserName())
        .Generate(amount);
}

}



### ドキュメント
やり方は matias が公式ブログで書いてる:

[https://devblogs.microsoft.com/cosmosdb/introducing-bulk-support-in-the-net-sdk/:embed:cite]


実装方法は Microsoft Learn でのドキュメントをみるとよい。


[https://learn.microsoft.com/ja-jp/azure/cosmos-db/nosql/tutorial-dotnet-bulk-import:embed:cite]