BEACHSIDE BLOG

Azure とか C# 好きなエンジニアの個人メモ ( ・ㅂ・)و ̑̑

Cosmos DB の Change Feed 機能と使い方 ( Azure )

Azure Cosmos DB の Change Feed は、データの変更を検知してイベントを発火できる超便利な機能です。Azure Functions の Cosmos DB Trigger を使うことで超簡単に利用できます。

個人的には近年 RDB はほぼ使ってなくて Cosmos DB ばっか使ってるわりに ChangeFeed の機能をブログで整理したことないなーと思い、改めて自分の知識の整理のためにメモ。

個人的に RDB を使わず Cosmos DB を使う最大の理由のひとつは、ChangeFeed 機能です。あとはスキーマレスなので運用上データスキーマの更新が楽だったりするのも好き。
( RDB と比べて優れてるってわけではなく双方にメリットもデメリットもあります。)

Change Feed 機能

概要

前述で雑に書きましたが、 Change Feed はデータが変更されたら検知する機能です。大事なのは、どんな課題に使えるか、使いたいかですね。

データの変更を検知できることによって、データの変更をトリガーにイベントを発火して幸せになるシナリオは山ほどあります。公式ドキュメントではこんな図で例として大きな3パターンを表しています。

f:id:beachside:20200303123645p:plain ※ Image source: Change feed in Azure Cosmos DB - overview

イベントソーシングなパターンやCQRS にめっちゃ使い勝手が良いです。そしてラムダアーキテクチャーを構築するのにもぴったりな機能ですね。クラウドネイティブみあるー♪

運用する上で気にかけておきたいポイントをいくつかピックアップします。

  • 挿入・変更の検知はできますが、2020/3時点では (意味深) 削除の変更は検知できない。回避策として、変更の種類がわかるプロパティを自分で用意して削除の場合は TTL ( Time to Live: 一定の期間が経過したら自動的に削除する機能)をセットして削除するとかで運用できます。つまり、何の変更があったかを検知するデータスキーマの配慮が必要。ただ、これはデータの変更検知後に複数のイベントを発火したいならどのみち必要になるので Change Feed を有効に活用するなら最初から考慮しておきべき事項です。→追記: 2020年5月の更新で削除も検知できるようになりました!
  • "更新だけを対象で挿入は対象外"といった操作は不可。削除以外の変更は全て検知します。前述同様になりますが、変更の種類が分かるプロパティを用意して、Change feed で検知された後に必要なイベントへ振り分けてあげます。

ガチで使う際はさらに以下の公式ドキュメントあたりから読み始めるとよいです。

Leases (リース)コンテナーの存在

Change Feed を利用するには、データの保存に使っているコンテナー(雑に言うと RDB でいうテーブルみたいなもの)とは別に、 lease 用のコンテナーが必要になります。手動またはプログラムのオプションによって「無ければ作成する」ってのを書いてあげます。ここら辺の実装は後述します。

Change Feed の利用(Azure Functions - Cosmos DB Trigger)

Change Feed を使うには、Azure Functions の Cosmos DB Trigger での利用が最も容易かつ実用的できます(と個人的には思っています)。
ここではローカル環境(Cosmos DB エミュレーター と Visual Studio 2019 によるローカルデバッグによるコンビネーションアタック)を使って進めましょう。このエミュレータがあるおかげでローカルでの開発が捗るのも CosmosDB の好きなポイントです。

(Azure 上で Cosmos DB と Azure Functions を実際に使ってでやるのとほぼ変わりません)

環境準備

Cosmos DB エミュレーターは、こちらの "Microsoft ダウンロードセンター"をクリックしてダウンロードとインストールが可能です。
インストール後、Cosmos DB エミュレーターを起動するとブラウザーで https://localhost:8081/_explorer/index.html が開かれます。このポータルでデータの確認ができます。
QuickstartPrimary Connection String があります。この値はあとで使うのでこのまま開いておきましょう。

f:id:beachside:20200303121103p:plain

Azure Functions - Cosmos DB Trigger の実装

Visual Studio 2019 で Cosmos DB Trigger の Azure Functions を作ります。Visual studio を起動して、新しいプロジェクトの作成で Azure Functions のプロジェクトを作成します。言語はもちろん C# です。そして右下の 次へ をクリックします。

f:id:beachside:20200228154823p:plain:w600

プロジェクト名とかは適当に入力して 作成 をクリックします。

f:id:beachside:20200228155311p:plain:w600

左上のバージョンはもちろん V3 を選択します。
トリガーは Cosmos DB Trigger を選択します。 右側で、 Connection String はからっぽでよいです。
Database nameCollection name は適当に入れます。私は今回 SampleDbItem と入力しました。 作成をクリックします。

f:id:beachside:20200228155246p:plain:w600

テンプレートからコードが出来上がったら、とりあえずソリューションエクスプローラーで local.settings.json を開きます。
このファイルは環境変数を設定できるファイルですが、ファイル名から察する通り、gitignore されているのでそれを変えない限りソースコード管理には入りません。簡易な機密情報のストアだと思ってさい(雑)。
ローカルデバッグ用の環境変数って話なので、Azure 上の Function App の環境変数(構成の中のアプリケーション設定で設定してるやつ)にも登録して以下のように編集しましょう。デフォルトのコードに CosmosDb: から始まるやつ3つ(接続文字列、Db名、Container名を追加しただけです。
CosmosDb:ConnectionString の値は、Cosmos DB エミュレーターのポータル画面(さっき開いたhttps://localhost:8081/_explorer/index.html )の Quickstart の Primary Connection String を貼り付けます。ローカルで動くものなので皆さん一緒だったはず。

{
    "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "FUNCTIONS_WORKER_RUNTIME": "dotnet",
    "CosmosDb:ConnectionString": "AccountEndpoint=https://localhost:8081/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==",
    "CosmosDb:Database": "SampleDb",
    "CosmosDb:Container": "Item"
  }
}

次に Function1.cs を開きます。生成されたコードをちょっとだけ変更して以下のようにしました。

CosmosDBTrigger ってそもそも Custom Bindings なんですが、その作り次第で環境変数の取り方が変わります。
そこら辺に関してポイントが何点かありますので解説。

環境変数からのデータ取得(12-13 行目)

そのひとつが、12行目と13行目。こいつらは % で囲んだ値を書いてあげると、環境変数から取得されます。なぜかは...仕様です(雑か)。あとで説明します。

環境変数から接続文字列取得(14行目)

前述で、% で囲んで環境変数って書いたのにここでは囲まれてない?!...そうなんです、仕様です(雑か)。CosmosDBTriggerAttributeConnectionStringSetting プロパティには [AppSetting] アトリビュートがついているためです。これがついてないと、%で囲って取ってくる(正確には、class に Binding アトリビュートをつけることで可能にするんですが)。
詳しくはこちらに書かれています。

Lease 用の Conatainer名のセット(15行目)

さて次はどっちの書き方だ? 実装を確認するとこのプロパティは [AppSetting] アトリビュートの無いプロパティなので、% で囲まないと環境変数からデータを取得できません。ここで書かれてる値がそのままの設定されます。

ちょっとややこしいですかね。原理が分かると簡単です。

CreateLeaseCollectionIfNotExists =true (16行目)

これを true にすることで Lease 用の Container が無ければ勝手に作ってくれます。必須の設定ですねー。

と、あえて色んな書き方してみたサンプルでした。

デバッグ実行

Cosmos DB エミュレーターのポータルで Explorer を開いて、Database とその中に Container を作りましょう。先ほどのコーディングで local.settings.json に設定した名称で作ります( 上部の Container を作るアイコンをクリックすると Database の作成もできます)。

Cosmo DB の SDK だと CreateIfNotExists 的なメソッドが Database や Container に用意されているんですが、CosmosDB Trigger にはそのオプションが見当たらないので先に作っておきます。

出来上がるとこんな感じ。

f:id:beachside:20200228181915p:plain:w400

では、先ほど作った Azure Functions の19行目あたりで適当にブレークポイントを貼っておきましょう。そしてデバッグ実行!

実行できた時点で、Cosmos DB エミュレーターのポータルを更新すると leases の Container が生成されたことが確認できます。

f:id:beachside:20200228182443p:plain:w400

エミュレーターのポータルからデータを入れてみましょう。Item Container の中の Items をクリック > New Item をクリックして、Json で値を適当に入れます。

最後に Save をクリックして保存します。

f:id:beachside:20200228182943p:plain

保存するとすぐに Functions のコードでブレークポイントがヒットします。正常に動いたことが確認できました。

f:id:beachside:20200228183347p:plain

後は、ここから Stream Analytics や EventHub や Storage の Queue とかデータを流して何らかの処理をするのがあるあるなやつです。簡単かつ超絶 Cool ですね♪

レイテンシー(ディレイ)はいかほどに?

Cosmos DB を更新した後、Change Feed が動くまでの遅延がどれくらいか気になりますね。特に Azure Functions の従量課金(Consumption) プランだとコールドスタートがあります。

なーんて思ったりしそうですが、Cange Feed は change feed processor library によってポーリングしてるだけなので、Cold Start を気にするよりもポーリングの間隔を気にすればよいです。

デフォルトは 5 秒です。CosmosDBTriggerFeedPollDelay に値を設定してあげることで変更できます。

f:id:beachside:20200303135656p:plain

CosmosDBTriggerFeedPollDelay を5秒と1秒に設定したときで動作の違いをモニターしたらこんな感じになりました。
横軸は時系列で、縦軸はミリ秒です。ちなみにレイテンシーの計算には、Cosmos DB にデータのタイムスタンプ ( _ts ) と Change feed の Function App に到達した時間の差を求めています。

5秒おきにデータを挿入し続けながら、最初はポーリングの間隔である FeedPollDelay を 5秒、AM4時の途中から 1秒に変更した結果です。

f:id:beachside:20200303135854p:plain

FeedPollDelay を5秒にしたときの動きが周期的なのでなんかプログラム間違ったかなーと疑いましたが問題なさそう。
ちなみに 5分毎、30毎、1時間毎にデータを挿入し続けても想像できそうな(特に問題のない)結果になりました。

レイテンシーをチェックできるコードに更新

本題からちょっと離れますが、Application Insights のカスタムテレメトリーを使って App Insights で良い感じにチャートで表示できるようにしてみましょう。

Timer Trigger の Function App

特定の時間ごとに Cosmos DB にデータを Insert する Function App をこんな感じに作りました。クラス名は適当にもほどがあるイケてなさでここで後悔することを考えてなくて後悔してます(でも直さない)。

Cosmos DB の出力バインドを使っているので、データの挿入も実質的には 21 行目の1行だけで実現してます。 これがバインドパワー!(とは

15 行目で out で dynamic な data を定義しているので、21行目で代入してあげるだけで Cosmos DB へのデータの Insert が完了します。
ちなみに ID を指定しなければ GUID が勝手に付与されるんですが、コードで書くことが無さ過ぎたので書きました。
また、既存の ID を指定すれば Insert ではなく update になります。

Cosmos DB Trigger の Function App

Cosmos DB Trigger の方はこんな感じ。App Insghts 使うので、Nuget で Microsoft.ApplicationInsights (現時点最新の v2.13.1 )をインストールしてます。

ざっくり解説ですが、

  • 14 - 20行目: App Insights のテレメトリーを DI で取得します。デバッグ環境でも動くように DI の設定するのがめんどかったので #if で回避しました。なんという雑さでしょう。
    そんなことはさておき Azure 上で動くときは、これで App Insights にデータを送ることができます。
  • 40行目: Cosmos DB にデータが更新された際の情報は、_ts に UnixTime で格納されています。ということで UTC の普通の時間に変換しています。41行目で、この Function App に到達した時間との差分をミリ秒で latency にセットしました。

  • 43 - 45行目: 今回は TrackMetric で latency を送りました。前述したチャートも Log Analytics でこんな KQL を書けば見えた結果になります。

customMetrics
| project timestamp, value, name, cloud_RoleName
| where name =="CosmosTrigger Latency"
| order by timestamp desc
| take 1000
| render scatterchart
  • 46行目: 普通のログでも出してみました。Structured log (構造化ログ) で書いてるので、Log Analytics で見るときは、id はcustomDimensions.prop__id 、latency は customDimensions.prop__latency って書いて取得できますねー。ざっくり Log の状況をみるならこんな KQL って感じです。
traces 
| project timestamp, customDimensions.prop__latency, customDimensions.prop__id,operation_Name,severityLevel, message, cloud_RoleName 
| where operation_Name == "Function1"
| where severityLevel == 1
| where message startswith "Latency" 
| order by timestamp desc
| take 500

Azure での準備

後は Azure で環境作って Function App に環境変数をセットしてデプロイすれば OK ですねー。Application Insights を構成してあげることで、Log Analytics で紹介した KQL を使ってテレメトリーを見れます。

それだけなので特にここでは書きません。

終わりに

非常に簡単に使えるし、これだけでアーキテクチャーの幅が広がります。RDB と NoSQL はぞれぞれメリットデメリットがあるので、両方のメリットを生かしてハイブリッドに利用していくと良いと感じています。