🍃このブログは移転しました。
3秒後、自動的に移動します・・・。

Cloudflare WorkersのDurable Objectsでチャットを実装する

っていうコードが公式のリポジトリにあって、なかなか読み応えのあるコードだったので記事に。

GitHub - cloudflare/workers-chat-demo

はじめに

Cloudflare Workersは、

  • CDNのエッジで動かせるWorker
  • ServiveWorkerみたいなコードを書く
  • Node.jsが動くわけではなく、プロトコルもHTTPしか話せない

ただ、HTTPが話せるならWebSocketが使えるのでは?って気付いてコード漁ってたら、ドンピシャなコードを見つけたという。

本旨としてはDurable Objectsのデモだけれども、他にもいろいろ気になる感じだったので、ちゃんと読んでみたらいろいろと発見があった。

Durable Objectsとは

端的にいうと、Workerから参照できる共通の状態。

WorkerAからもWorkerBからも同じ参照にアクセスできるので、トランザクションなどを気にせずdurableな状態として扱えるというもの。

Workers Durable Objects Beta: A New Approach to Stateful Serverless

この公式BlogのQ&Aがなかなか含蓄のあるものだったので、それもメモっておく。

DOでWebSocketを配信できるの?

できる。

DOをβ公開するにあたり、Workerそれ自体をWebSocketのエンドポイントとして使えるようにした。

DOのユースケースとして、もっとも便利だと思われるのがWebSocketを使ったアプリケーションのはず。
DOを識別するIDさえあれば、任意のWebSocketクライアントにメッセージングできるということ。

Workers KVとの違いは?

KVの永続化は、真にリアルタイムではない。

最終的にはすべてのロケーションに格納されるけど、その過程では特定のWorkerだけ古いデータがあったりする。
なので基本的に後勝ちで上書きされてしまうという前提がある。

そもそもKVをStorageとして見るなら、DOはまったく別物。
静的なコンテンツや設定、更新頻度の低いものを置いておく場所としては、引き続きKVは最適であり、オワコンではない。

ただ、パフォーマンス向上のために、内部的にKVの実装をDOにするかもしれないらしい。

DOも実行環境としてはシングルスレッドになるので、キャッシュできるものはKVに寄せるほうがいいし、あまりハードな計算をさせると、レスポンスが遅くなるはず。

なんでCRDTじゃないの?

(CRDTについてあまり知らない)

DOで想定したユースケースとしては、CRDTは大仰すぎるから。

もちろんDOの上に、独自のCRDTを実装することもできるので、お好きにどうぞ。

真にServerlessであるとは?

  • サーバーレスのアーキテクチャは、状態を持たないことが主流だった
    • 特定の関心ごとに、決められた処理を行う
    • だからわかりやすく、これまで支持されてきた
  • ただその反面、共通のデータや協調が必要になった場合は、別のサービスが必要になってしまう
  • 結局こういう層が必要になってしまうと、そもそものサーバーレスの目的を達せられない
    • スケールさせるためには?とか、リージョンは?とか考える必要があり、すごく煩雑になる
  • この問題を解決するため、特定の状態を分離して配置し、どこからでも参照できるようにした、それがDO
    • 状態の単位は、ユースケースに依存する
    • ECならカートごと、チャットならルームなど

とまあ、なかなかに壮大なことが語られていた・・・!

DOのAPIドキュメント

Durable Objects · Cloudflare Workers docs

使い方をざっと書き下す。

// これを、wrangler.tomlで指定する
// { binding = "mydo", class_name = "MyDO" } みたいに
export class MyDO {
  // 詳細は後述
}

// DOをリクエスト時に取得する
export default {
  async fetch(req, env) {
    const id = /* なんとかしてリクエストから手に入れる */;
    const mydo = env.mydo.get(id);

    // ID生成もできる
    env.mydo.newUniqueId();
    // 任意の文字列からも生成できる
    env.mydo.idFromName(name);
    env.mydo.idFromString(name);

    // DOに処理を投げる
    return mydo.fetch();
  }
}

// DOの詳細
export class MyDO {
  // おそらくデプロイ時に自動で呼ばれるので、参照をキープしておく
  constructor(state, env) {
    this.state = state;
    this.env = env;

    // this.xxx はご自由に初期化する
  }

  // Worker or 別のDOから呼ばれる(インターネットではなく、内部的な通信で)
  async fetch(req) {
    this.state.storage.get(); // put(), list(), delete(), transaction()

    // 通常のWorkerのと一緒
    this.state.waitUntil(promise);
  }

  // インスタンスメソッドもご自由に
  async yyy() {}
}

プリミティブではあるけど、割と直感的かなと思った。

`.mjs`

現状ドキュメントやらサンプルで一般的に書かれてるWorkerのコードは、ServiceWorkerのそれと一緒で、こんな書き口なはず。

// index.js
addEventListener("fetch", (ev) => { ev.request })
addEventListener("scheduled", () => {})

でもDOのサンプルではそうではなく、

// index.mjs
export default {
  async fetch(request) {}
  async scheduled(request) {}
}

export class DO {}

ってなってる。

どうやらこれはこの先そうなるかもよっていうコードだとか。
そして、現状βであるDOを使うためには、このシンタックスが必須らしい。

今までグローバル変数だった値たちは、`async fetch(request, env) {}`の第2引数から参照するようになる。これはこっちのほうがいいな。

ちなみにこれは、Modules-syntax Workerと称してるらしい。

いざコード

GitHub - cloudflare/workers-chat-demo

ファイルとしては2つだけ。

  • Workerとして動かすスクリプト
  • そのWorkerが返すHTMLをファイルに切り出したもの
    • `chat.html.bin`

UIとしては

特に特筆すべき点はない。(って、コードコメントにも書いてあるけど)

  • 名前の入力+次へ
  • ルーム名の入力+入室
  • チャット開始

ってだけ。

WebSocketでチャットというところでは、

  • 直接`new WebSocket("wss://")`してる
  • JSON文字列をやりとりしてる

くらい。

`.bin`

このUIのファイルの拡張子、なんでこんな風になってるの?って思ったら。

import HTML from "chat.html.bin";

というように、`.bin`の拡張子にしておくと、それをバイナリとして認識させられて、`content-type`だけちゃんとすれば、そのまま`Response`で返せると。

return new Response(HTML, {headers: {"Content-Type": "text/html;charset=UTF-8"}});

ただこれも、

However, the space available for assets served this way is very limited; larger sites should continue to use Workers KV to serve assets.

だそうです。

WebSocketPair

見慣れないやつがいると思ったら、CFWが独自で実装した秘密のクラスっぽかった。(そのうちドキュメントされるんかな?)

To accept the WebSocket request, we create a WebSocketPair (which is like a socketpair, i.e. two WebSockets that talk to each other), we return one end of the pair in the response, and we operate on the other end. Note that this API is not part of the Fetch API standard; unfortunately, the Fetch API / Service Workers specs do not define any way to act as a WebSocket server today.

なるほど。

const [clientWS, serverWS] = new WebSocketPair();

serverWS.accept();
// このレスポンスで、WebSocketのUPGRADEが成される
return new Response(null, { status: 101, webSocket: clientWS });

あとはいつものWebSocketってわけ。

serverWS.addEventListener("message", fn);
clientWS.send(JSON.stringify({ hello: "world" }));

DOは`fetch()`でしか操作できない

あくまで現時点では、らしいけども。

たとえばこのデモでは、レート制限が実装されてて、特定のIPからめちゃめちゃ書き込むことができないようになってる。

で、IPごとのレートを管理するためにもDOを使ってる。

ただDOを使うがゆえに、そのカウンターを操作するためにも`fetch()`する必要が出てくる。URLなんかないのに。

// みたく、validな適当なURLを指定するしかないらしい(どうせインターネットに出ないのでなんでもいい)
await this.limiter.fetch("https://dummy-url", {method: "POST"});

あとDOへの`fetch()`も、失敗することがあるって書いてあった。
まあDOもどこかのネットワーク上に配置されてるので、不通になることもあるってことか。

まとめ

最初のQ&Aにあった通り、発想自体は良さそうではある・・。

が、実行時パフォーマンスが気になるのと、要するにグローバル変数なので、扱うのはかなり難易度が高そうだなと思った。

Using Durable Objects · Cloudflare Workers docs

現状はまだβであり、制約も結構あるので、単に試す事自体もそれなりにハードやなというのが正直なところ。(自分もコード読んで満足してしまった)


単純な増減カウンターの実装がこうなるというのを見て、このお気持ちを察して欲しい。

export class Counter {
    constructor(state, env) {
        this.state = state;
    }

    async initialize() {
        try {
            let stored = await this.state.storage.get("value");
            this.value = stored || 0;
        } catch (err) {
            // If anything throws during initialization then we
            // need to be sure that a future request will retry by
            // creating another `initializePromise` below.
            this.initializePromise = undefined;
            throw err;
        }
    }

    // Handle HTTP requests from clients.
    async fetch(request) {
        // Make sure we're fully initialized from storage.
        if (!this.initializePromise) {
            this.initializePromise = this.initialize();
        }
        await this.initializePromise;

        // Apply requested action.
        let url = new URL(request.url);
        let currentValue = this.value;
        switch (url.pathname) {
        case "/increment":
            currentValue = ++this.value;
            await this.state.storage.put("value", this.value);
            break;
        case "/decrement":
            currentValue = --this.value;
            await this.state.storage.put("value", this.value);
            break;
        case "/":
            // Just serve the current value. No storage calls needed!
            break;
        default:
            return new Response("Not found", {status: 404});
        }

        // Return `currentValue`. Note that `this.value` may have been
        // incremented or decremented by a concurrent request when we
        // yielded the event loop to `await` the `storage.put` above!
        // That's why we stored the counter value created by this
        // request in `currentValue` before we used `await`.
        return new Response(currentValue);
    }
}