BEACHSIDE BLOG

Azure と GitHub と C# が好きなエンジニアの個人メモ ( ・ㅂ・)و ̑̑

Cosmos DB Change Feed の Azure Functions (C#) 実装入門 2023 年編 (Function App / Cosmos DB Trriger)

2022年の後半に Microsoft.Azure.WebJobs.Extensions.CosmosDBv4 が GA して実装方法も改良されたので、ここで改めて Cosmos DB の Change Feed の開発方法 ≒ Cosmos DB Trigger の Function App の実装の基礎や Tips を書いていきます。

なお、Change Feed の概念やどんな問題に向き合うときに使う武器なのかの理解を進めたい方は、三宅さんの記事をみていただくとスーパーよい学びになります。

k-miyake.github.io

ということで本題に入っていきますが、今回は Visual Studio 2022 の 17.4.3 でやってます。バージョンが古いと、事前準備で行う作業ができなかったりするので注意が必要です。

事前準備

Change Feed を動かすうえで Cosmos DB のリソースが必要なのでその作成と、VS で必要な準備を書いていきます。

Cosmos DB のリソースの作成

Cosmos DB がない Change Feed 自体も動かせないのでリソースを作成しておきましょう。日本語の翻訳が微妙なので補足しておくと Container name は日本語のようなダブルバイトの文字を使うのはやめましょう。ドキュメントでは「アイテム」となってますが「Item」とした方が色んな意味で無難です。

learn.microsoft.com

Azure のリソース作れないよってときは Azure Cosmos DB Emulator を使うのも悪くないですが、ここでは Azure にリソースを作成したとして話を進めます。

Visual Studion の準備

はじめて Function App を作る場合は、まず Visual Studio で 「Auzre の開発」のワークロードが含まれているか確認しましょう。Visual Studio Installer を起動 → 変更 ボタンを押すと、インストールしているワークロードが確認できます。ここで「Azure の開発」にチェックがついてるかを確認し、チェックがオフの場合はチェックをオンにしてインストールしましょう。

次は Azure Functions のツールを最新にしましょう。Visual Studio を適当に (コードなしで実行とかで OK)で起動して、上部の ツールオプション をクリックします。

プロジェクトおよびソリューション 配下にある Azure Functions をクリックします。更新の確認 ボタンをクリックして最新かを確認しましょう。最新でなければインストールのボタンが出てくるのでインストールします。画面は 更新の確認 ボタンが disabled の状態でシーンってなって動いてるかわかりにくいですが、ボタンが押せるようになるまで待っていればそのうち更新がおわります。

これで事前準備は完了。一度 VS を閉じましょう (手元に最新の VS しかなくて、VS を再起動する必要あったか覚えてないで念のため)

Cosmos DB Trigger の Function App 作成

C# でコード書いていきましょう。と言ってもほぼ書く必要なくできてしまいます。

プロジェクトの作成

VS を起動して 新しいプロジェクトの作成 をクリックします。

上部の検索バーで「azure functions」と入力して検索すると、C# の Azure Functions のテンプレートが表示されます。これをダブルクリックします。一回クリックして、次へ ボタンを押してもいいですが、ダブルクリックで進んだ方が楽です。

この画面の入力項目は適宜入力して 次へ をクリックします。

ここでは以下を参考に入力します。記載のないところはデフォルトでよいです。これで 作成 をクリックするとプロジェクトが作成されます。

項目 説明
Functions worker 「.NET6.0」を選びます。.NET 7.0 Isolated は現時点では不安定すぎて時期尚早な印象です。
Function 「Cosmos DB Trigger」を選びます。
Collection name Cosmos DB のリソースを作成した際につけた Container 名を入力します。
Connection string setting name 接続文字列を環境変数から取得する際のキーの名称になります。なんでもよいですがここでは「CosmosConnection」とつけました。
Database name Cosmos DB のリソースを作成した際につけた Database 名を入力します。

プロジェクト作成後、「依存関係に接続する」のウインドウがでます。Cosmos DB の接続文字列を取ってきて、ユーザーシークレットにセットしてくれる便利な機能です。

(前のウインドウで「dependencies を構成する」にチェックが入っているために出てくる機能ですが、個人的には慣れているせいか portal から接続文字列をとってセットした方が早いのと依存関係周りがめんどいので使ってないんですが...)

Azure Cosmos DB をクリックすると、認証の画面出てきて Azure から Cosmos のリソースの選択とか諸々出てくるのでよしなに選んで終わらせます。よくわからない場合はデフォルトのまますすめれば OK です (本質から外れるので説明省きます)

接続文字列が、CosmosConnection というキーでユーザーシクレっとに保存されていますので一応確認しておきましょうか。ソリューションエクスプローラーで Function App のプロジェクトを右クリック → ユーザーシークレットの管理 をクリックします。これで正しくセットされていることが確認できます。

この機能だとユーザーシークレットに入りますが、それ以外の方法として、デフォルトで gitignore に含まれている local.settings.json の Values セクションの中にセットしても問題ないです。

Microsoft.Azure.WebJobs.Extensions.CosmosDB のバージョン確認と更新

Ctrl + Q キーで検索にカーソルを移動し「nuget」と入力して NuGet パッケージの管理をクリックします (ソリューションの方でもとっちでもよいです) 。

2022年12月で最新の Visual Studio 2022 の 17.4.3 では、Azure Functions のテンプレートは Microsoft.Azure.WebJobs.Extensions.CosmosDB の最新バージョン v4 が適用されていないので、手動で更新します。ついでに Microsoft.Sdk.Functions も最新にしておきましょう。

ちなみに2023 年でそのうちデフォルトで v4 系に変わるとこの作業はいらなくなります。

更新が終わったら「Function1.cs」を開きます。まずは以下図の赤文字でエラーになっている3か所を修正します。

修正したコードは以下です。これで v4 の CosmosDBTrigger 用になりました。

public static class Function1
{
    [FunctionName("Function1")]
    public static void Run([CosmosDBTrigger(
            databaseName: "ToDoList",
            containerName: "Item",
            Connection = "CosmosConnection",
            LeaseContainerName = "leases")]IReadOnlyList<Document> input,
        ILogger log)
    {  // 以下省略

ざっくりな解説ですが、先ほど UI で設定した内容は CosmosDBTrigger のプロパティとしてセットされます。これと (先ほど自動でユーザーシークレットに設定された) 接続文字列だけで Cosmos DB への接続して Feed を受け取る準備が完了ってわけです。

IReadOnlyList<Document> の定義はエラーがでたままです。ここを直していきます。

余談ですが v3 までは IReadOnlyList の generics に Microsoft.Azure.Document namespace の Document class を指定せなあかんかったですが、v4 から自由に型を指定できるようになりました。ということで自分で必要な class を用意して定義していきます。

冒頭の Cosmos DB のリソース作成のチュートリアルでは以下の JSON schema でデータを入れるサンプルになっていました。

{
    "id": "1",
    "category": "personal",
    "name": "groceries",
    "description": "Pick up apples and strawberries.",
    "isComplete": false
}

これを class にして以下のようにしてコード全体を以下のようにしてみました (サンプル程度にシンプルなら class じゃなくて record でもよいです)。

using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;

namespace FunctionApp1;

public static class Function1
{
    [FunctionName("Function1")]
    public static void Run([CosmosDBTrigger(
            databaseName: "SampleDb",
            containerName: "Collection1",
            Connection = "CosmosConnection",
            LeaseContainerName = "leases")]IReadOnlyList<Todo> input,
        ILogger log)
    {
        foreach (var todo in input)
        {
            log.LogInformation($"id: {todo.Id}; category: {todo.Category}; isComplete:{todo.IsComplete}");
        }
    }
}

public class Todo
{
    public string Id { get; set; }
    public string Category { get; set; }
    public string Description { get; set; }
    public bool IsComplete { get; set; }
}

lease container が必要

Change Feed は LeaseContainerName で指定した container (今回だと leases ) で状態を管理して feed をしています。つまりこの container を作成する必要があります。ということで Azure portal で leases という名前の container を作成しておきましょう。partition key は /id で大丈夫です。

手作業で作成しなくても以下のように CreateLeaseContainerIfNotExists = true を書いておけば自動で作成してくれますが、毎回 lease container の存在チェックされるのはパフォーマンス的にもうざいので、実運用で使うのはあまりおすすめできないです。

    public static void Run([CosmosDBTrigger(
            databaseName: "ToDoList",
            containerName: "Item",
            Connection = "CosmosConnection",
            CreateLeaseContainerIfNotExists = true,
            LeaseContainerName = "leases")]IReadOnlyList<Todo> input,

lease の仕組みはそのうち しばやん が解説してくれるので楽しみにしててください (後日リンクをつけます多分)。

ここまででデバッグできる準備ができました。

デバッグ実行で動作確認

デバッグ実行して、Cosmos DB にデータを入れたり変更したりしてみましょう。今回のコードだと、こんな感じで Feed されたデータが取得できました。

エラーが起きる場合は...接続文字列が間違ってることは少ないのでエラーメッセージをちゃんと読めば解決できると思います。多くの場合以下の内容かなと思います。

  • Database name, Container name が間違ってる
  • lease の container がない

Change Feed を取得するところまでは非常に簡単に実現できるので、あとはビジネスロジックに集中できるってのが開発体験として非常に良い点です。

ちなみに、今回はデフォルトで Cosmos DB ではキャメルケースで保存ざれているデータを C# のパスカルケースに紐づけてデータを取得してくれてます。class のプロパティがキャメルケースでもマッピングされますが、本題からずれるのでここで詳しく話すのはやめておきます。

シリアライザーの変更は可能ですが...まぁここで書くとさらに長くなるのでやめておきます...すぐ脱線しそうになる。。。

最初に知っておきたい Change Feed の超基本

「わーい動いて簡単」って話しで終わってもしゃーないので、実運用に向けて最初に知っておきたい超基本をいくつか紹介します。

複数の Feed をしたいときはどうする?

Change Feed は、1つの Container から複数の Feed を送ることが可能です。その分、状態管理も必要になってきます。その際は LeaseContainerPrefix を設定します。値はメソッド名を付けておくのがわかりやすくてよいと思います。
ただ、今回のサンプルは Function1 という雑な名前なので、実際に開発するときは意味のあるメソッド名にしてそれを prefix にしましょう。

    [FunctionName(nameof(Function1))]
    public static void Run([CosmosDBTrigger(
            databaseName: "ToDoList",
            containerName: "Item",
            Connection = "CosmosConnection",
            LeaseContainerPrefix = nameof(Function1),
            LeaseContainerName = "leases")]IReadOnlyList<Todo> input,
        ILogger log)

ちなみに複数の LeaseContainer を用意することもできますが、lease の container の RU の消費は気にならないほど小さいので、そこの問題が出ない限りは 1つの lease container に prifix をつける対応で問題ないです。

Cosmos DB Trigger はポーリング

Change Feed って名前から、Cosmos DB の変更があったらデータが飛んでくる雰囲気を感じますが、実態は Function App がポーリングしてるだけです。

ポーリングをしていることで、例外発生時のリトライのしやすさを実現できています。

ポーリングの間隔はデフォルトで5秒なので、この値をユースケースに合わせて長くしたり短くしたりできます。FeedPollDelay で指定ができます。以下だと1秒おきになります。

ちなみに一度データを取ってきて次もデータがある場合は delay 無しでとってきます。Feed のデータがなくなったら delay をするって仕組みです。

    public static void Run([CosmosDBTrigger(
            databaseName: "ToDoList",
            containerName: "Item",
            Connection = "CosmosConnection",
            LeaseContainerPrefix = nameof(Function1),
            FeedPollDelay = 1000,
            LeaseContainerName = "leases")]IReadOnlyList<Todo> input,
        ILogger log)

Delete は非対応

現状ではデータの insert / update が起きたときに Feed が取れます。delete されたときは Feed がとれないので、それをやりたいならフラグなどつけて soft delete しましょう。余談として delete も実装されるとか話がかなり前に出てた気がしますが来ないですね。

順序保証

順序保証を保つため、Cosmos DB Trigger の Function App はシングルスレッドで動作します。つまりスケールアウトはしないです。

実行回数や処理の重さ次第ってのはありますが、実運用を考えると Function App のホスティングプランは App Service plan が適しているケースが多いです。別の App Service plan がある場合は相乗りさせてコストを抑えるのもよいでしょう。

さっと検証する程度なら Consumption (Serverless) プランでよいと思います。

リトライポリシー

公式ドキュメントではリトライポリシー ( "再試行ポリシー" って書かれてますが...) は「構成できません」と書かれていますが、この PR にある通り v4 はサポートされています (されなそうだったけど、無事にサポートされました) 。以下ドキュメントの「再試行戦略」セクションに設定方法があります。

リトライについては、Feed をどう活用したいか次第で構成方法を検討する必要があります。
例えば「データの変更があったら絶対にそのフィードは取得したい、例外でスキップされたらデータがおかしくなるので絶対に許せん」ってときはリトライは無限に構成し、プログラムを修正して例外が発生しなくなるまで Feed は前に進めないという考え方が必要なこともあるでしょう。

例外がおきてもとりあえず進めて Feed を進めて問題ないときは、ログとか poison queue 的なところに対して後で処理を再開させる的なことも考えられますが、その際は順序性が崩れます。それを加味する必要性の有無含め冪等性を考慮する必要があります。

複数のデータが Feed されるときに気を付けたい冪等性

コードから想像がつくと思いますが、Feed の取得は1件ずつではなく複数のデータを取得してきます。ここで気にすべきことは冪等性です。

例えば10件の feed を取ってきます。5件は正常に処理して6件目で例外が発生した場合、リトライが設定されていると10件とるところからはじまります。そのため、もう一度処理が流れても冪等性を保つ実装が必要になります。

MaxItemsPerInvocation を指定してうまくやるって方法もありますが、冪等性を保つコードはどんな時も検討して実装するのがよいです。

おわりに

Tips を書きだすとまだまだ出てきて長くなるので、今回はこの辺にしておこうと思います。

まず最初に読んでおいた方がいいドキュメントのリンクをはっておきます。日本語の翻訳が追いついておらず内容が古いこともあるので、新しいことについては英語のドキュメントを確認するのがお勧めです。