Takeshi Takeda

Takeshi Takeda

Software Engineer

Yjs を利用した共同編集アプリ

Yjsを利用した共同編集アプリ

はじめに

ソフトウェアエンジニアの武田です。

アプリを開発するうえで、ユーザ間で同じ情報を同時に編集したい、という要求が求められることがあります。このような要求を実現しているアプリに MiroConfluence といったものが挙げられます。

同時編集機能の実装は自明ではありません。あるデータに複数人が同時に操作を加えた時に、それらの操作の整合性を担保し望ましい状態が得られるよう制御する必要があるからです。例えば、2つの要素を持つ配列型のデータ [A, B] が存在しているとき、「任意のインデックスの要素を削除できる」という機能の実装を考えます。各操作につき list.pop(target_index) を順次実行する実装を行ってしまうと、複数人のユーザが A を削除したいと考え「1つめの要素を削除する」という操作を同時に行ったとき、[A, B] => [B] => []という遷移を経て空配列が得られてしまいます。各ユーザは[A, B]という配列に対して「1つめの要素を削除する」という操作を行っているため、[B]という配列が最終的に得られることを期待しているはずです。

このように、同時編集機能の実装は自明ではありませんが、Yjs というライブラリを利用することで比較的簡単に実現できます。

本記事では、Yjs ライブラリの紹介・基本的な使い方・使う上で考慮すべき点を紹介します。

Yjs とは

Yjs は、あるデータについて複数人が同時に編集操作をしたとき、それらの操作を自然な形でマージしてくれるフレームワークです。この機能を実現するために、Yjs の内部では YATA と呼ばれるアルゴリズムを実装しています。詳細は省略しますが、要約をすると以下のような特徴があります。

  • データに対する変更操作は可換な操作のみを実装している。すなわち、あるデータに対する2つの操作 A と B が存在しているとき、以下は同じ結果が得られる。
    • A を適用した後に B を適用する。
    • B を適用した後に A を適用する。
  • 各ユーザは同じデータから編集を開始し、定期的に互いの操作を共有し、自身のデータへ反映させる。可換な操作のみを認めているので、操作を反映する順番がユーザ間でそれぞれ異なっていても、最終的には同じデータを復元できる。

ネットワークを通じてデータを共有するためのライブラリも提供されており、Websocket 通信や P2P 通信を行うことも可能です。これらのライブラリも用いることで、「ブラウザ上で編集したデータが、即座に別のユーザの画面にも反映される」といったアプリを開発することができます。 また、ネットワーク通信が切断されたとしてもオフライン下でデータの編集を続けることが出来ます。ユーザがオンラインに復帰したタイミングで、ローカルに反映していたデータの変更を他ユーザに通知し、また他ユーザの変更を自分のデータへ反映させることが出来ます。先に述べたように、データに対する変更は可換な操作のみを実装しているため、どのような順番でそれらの操作が適用されたとしても、最終的には全ユーザが同じデータを共有することになります。

Yjs では以下のデータ構造に対して、「挿入」と「削除」の可換な操作を実装しています。

  • Array
  • Map
  • Text
  • XmlFragment(XmlElementの配列)
  • XmlElement

Yjs は ProcessMirrorQuill といったテキストエディタをサポートしていますが、Array や Map といったデータ構造も使用できるため、テキストエディタに限らず幅広いアプリを作成することができます。

Yjs の基本的な使い方

Yjs を利用した基本的なデータ更新方法について紹介したいと思います

import * as Y from 'yjs';

const ydoc = new Y.Doc();
const yarray = ydoc.getArray('myarray');
yarray.insert(0, ['A']);
yarray.insert(1, ['B']);
console.log(yarray.toJSON()); // => ['A', 'B']

const ydocRemote = new Y.Doc(); // リモートユーザをシミュレート
Y.applyUpdate(ydocRemote, Y.encodeStateAsUpdate(ydoc));  // オリジンユーザの状態を反映させる
const remoteArray = ydocRemote.getArray('myarray');
console.log(remoteArray.toJSON()); // => ['A', 'B']

yarray.delete(0, 1);  // ['A', 'B']の配列に対して、0番目の要素を削除することを試みている
remoteArray.delete(0, 1);  // リモートユーザも同じく、['A', 'B']の配列に対して、0番目の要素を削除することを試みている

Y.applyUpdate(ydoc, Y.encodeStateAsUpdate(ydocRemote));  // 自身の操作とリモートユーザの操作をマージする

console.log(yarray.toJSON()); // => ['B']

上記の例では、2つの要素が挿入されている Array 型のデータに対して、2人のユーザーが同時に同じ index の要素を削除したときの挙動をシミュレートしています。ナイーブな削除処理を行ってしまうと最終的に得られる配列は空配列となってしまいますが、Yjs は同時に行われた削除処理を自然な形でマージしてくれます。

続いて、基本的なシステムの全体構成について簡単に紹介します。 共有データを編集する処理はフロントエンドに記述し、フロントエンドでデータを処理します。編集されたデータは、y-websocket を利用することで、Websocket 通信によって他ユーザに変更差分を通知・反映させることが出来ます。Websocket 通信を行うためには、y-websocket サーバーを動かすためのバックエンドサーバーも必要になります。ここまでを用意することで、低レイテンシの双方向通信による同時編集ができるようになりますが、Yjs および y-websocket にはデータの永続化機能が提供されていません。データを永続化したい場合、y-redis や y-postgresql といった追加のライブラリ、及びデータの実体を保存しておく RDB や NoSQL を用意する必要があります。

基本的なデータ操作、及びシステムの全体構成に関する紹介は以上になります。共同編集アプリを作成する土台は整いましたが、Yjs を利用するうえで考慮すべき点は他にもいくつか存在します。

  • 高度な操作の実現について
    • Yjs では、データに対する可換な操作として「挿入」および「削除」を提供している。「挿入」および「削除」以外の高度な操作(例えば、「ある要素を別の場所に移動させる」といった移動操作)はどのように実現したら良いのか。
  • バックエンドサーバーのより詳細な構成について
    • バックエンドサーバーは高負荷になったら自動スケールさせることがある。Websocket 通信を行う都合上、特に対策をしないのであれば、通信を行いたいユーザ同士を常に同じマシンに接続させる必要がある。通信が可能な状態を保ったまま、バックエンドサーバーを自動スケールするためにはどうしたらよいか。
    • y-websocket を動かすバックエンドサーバーにおいて、どのような手段でユーザ認証を実装すべきか。

以降の章では、これらの点についてより詳細に紹介したいと思います。

高度な操作の実現方法(移動操作)

ある配列に対して2つの異なるインデックス i, j が与えられたとき、i番目の要素をj番目の要素の後ろへ移す操作のことを移動操作と定義します。いわゆるカンバンボードのようなアプリを作るときに、移動操作を実装する必要が生じます。

Yjs では挿入と削除が実装されているので、それらを組み合わせることで、移動操作を一見実現できそうです。

注意: 擬似コードです
function moveYArray(yArray, i, j) {
  const tmp: string = array.get(i);
  array.delete(i);
  array.insert(j, tmp);
}

同時編集の観点では、このアルゴリズムは上手く機能しません。具体的な例を見ながらアルゴリズムの動作を追っていきます。

  • yArray = [A, B, C]という配列データが存在し、この配列データに対して2人のユーザが同時に以下の移動操作を実行したと仮定する。
    • 1人目のユーザ: moveYArray(yArray, 0, 1). すなわち、AB の後ろに移動させることを期待している。
    • 2人目のユーザ: moveYArray(yArray, 0, 2). すなわち、AC の後ろに移動させることを期待している。
  • 1人目のユーザの操作が適用されると、[A, B, C] => [B, C] => [B, A, C]という遷移を経て、[B, A, C]という配列が得られる。
  • 1人目の操作反映後、2人目のユーザの操作が以下のように適用される。
    • まず初めに「最初の要素を削除する」という操作が試みられるが、「[A, B, C]に対して1番目のデータを削除する」という操作は1人目のユーザの処理によって完了しているため、削除処理は行われない。
    • 次に、「C の後ろに 、一時変数として保存したA を挿入する」という操作が試みられる。この操作はまだ行われていないので実行される。1人目のユーザの処理結果と併せ、最終的には[B, A, C, A]という配列が得られる。

このように、同じデータを同時に別の場所へ移動させようとすると、データがコピーされてしまいます。 この課題を解決するアプローチはいくつか考えられそうです。一つには「同じデータを別の場所へ同時に移動することを禁止する」という案が考えられそうですが、「ネットワークが分断されていても個々が持つデータ内に操作を反映させることができ、通信が可能になったらそれらの操作を互いに反映させる」という機能が Yjs にはあります。分断耐性がある以上、「自分が行おうとしている操作に対してロックを獲得する」といったようなアプローチは難しいことが予想できます。簡単かつシンプルなアプローチとして「矛盾した状態が発生したら、その状態を解消する」というものが考えられます。より具体的には以下のとおりです。

  • 各データには一意となる識別子を割り当てておく。移動操作は、挿入操作と削除操作を組み合わせたナイーブな実装を行う。ナイーブな実装では移動操作後にデータがコピーされうるが、それを検知し次第重複しているデータを削除する。
    • 「挿入」と「削除」によるナイーブな実装はフロントエンドで実装する必要があります。
      • 先ほど紹介した擬似コードと同等の処理を実装すればよいです。
    • 重複データの削除処理は、技術的にはフロントエンド・バックエンドどちらでも実装することが出来ます。以下を考慮すると、フロントエンドで実装することが無難だと考えられます。
      • 基本的に、データの編集処理はフロントエンドで実装される。編集処理をフロントエンド・バックエンド両方に実装してしまうと、データの変更がどこでどのように行われているのか追いづらくなる。
      • 先の例において、「1人目の操作を反映した後、2人目の編集処理を適用する段階で、2人目のユーザの通信が途切れてしまった」というシナリオを考える。通信が途切れた後は一時的にローカル環境へ操作が反映されるため、バックエンドで重複削除処理を実装してしまうと、オンラインに復帰するまで [B, A, C, A] が2人目の画面に表示されてしまう。

Yjsでは、データの更新イベントをトリガーとして任意の処理を実行する機能が提供されているので、上記の実装を行うことは容易です。

...
yDoc.on("updateV2", (update: Uint8Array): void => {
      // データの更新を反映する
      Y.applyUpdateV2(yDoc, update);
      // ここで重複データ削除処理を実行する
      deleteDuplicatedID(yDoc);
});

バックエンドの構成

バックエンドには以下の機能を求められることが多いと思います。

  • 高負荷になった場合、バックエンドサーバーを簡易にスケールアウトできる。
    • Websocket を利用する場合、(何も工夫をしないのであれば)通信を行いたいユーザ同士を常に同じバックエンドサーバーへ繋ぐ必要が出てきます。
  • データを永続化できる。
  • ユーザ認証を行うことができる。

簡易にスケールアウトを行え、かつデータを永続化できる手段として y-redis の利用が挙げられます。y-redis はバックエンドサーバーの他に Redis および RDB/Storage を利用し、バックエンドが自動スケールしたときでも常に互いに通信を行うことが出来るようになります。 y-redis を利用したときのアーキテクチャ概要図は以下のとおりです。

バックエンドアーキテクチャ

y-redis は server componentworker component の2つのコンポーネントで構成されています。server component はバックエンドサーバーと Redis を繋ぐ責務を持っており、このコンポーネントのおかげで、ユーザはどのバックエンドサーバーに Websocket のコネクションを貼っても、互いに通信することができます。worker component は Redis と RDB / Storage の間で仕事をするコンポーネントで、定期的に Redis に存在するデータを RDB / Storage に永続化する役割を持ちます。

y-redis を導入することで、スケールアウトを考慮したバックエンドサーバーを簡易に実装できます。

一方、y-redis を利用する場合はAGPLとして利用するか、ライブラリ作成者に連絡を取って有償のライセンス契約を結ぶ必要があります。 AGPL として利用する場合、ソースコードを公開する必要があるため注意が必要です。

y-redis の代替として、y-postgresql を利用する手段も考えられます。こちらは MIT ライセンスとされているため、比較的自由に利用することが可能です。y-postgresql を利用すると、バックエンドサーバーを直接 postgresql へ接続し、データの永続化を行うことが出来ます。 こちらのライブラリをそのまま使う場合、y-redis とは異なりバックエンドサーバーのスケールアウトには対応できませんが、Postgresql の LISTEN および NOTIFY を利用することで対応できます。

具体的には、Postgresqlで以下のようなテーブル及び通知関数・トリガーを用意しておきます。

-- y-postgresql が変更操作を保存しておく目的で利用するテーブル
-- アプリ開発者が中身を知る必要は基本的にはない
CREATE TABLE public."yjs-writings" (
  id serial4 NOT NULL,
  docname text NOT NULL,
  value bytea NOT NULL,
  "version" public."ypga_version" NOT NULL,
  CONSTRAINT "yjs-writings_pkey" PRIMARY KEY (id)
);

-- 通知を送信する関数を作成
CREATE OR REPLACE FUNCTION notify_change()
RETURNS trigger AS $$
BEGIN
  PERFORM pg_notify(
    'change',
    json_build_object(
      'operation', TG_OP,
      'record', row_to_json(NEW)
    )::text
  );
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- トリガーを作成
CREATE TRIGGER change_trigger
AFTER INSERT OR UPDATE OR DELETE ON "yjs-writings"
FOR EACH ROW
EXECUTE FUNCTION table_change();

バックエンドサーバーでは、以下のようなクラスを用意します。

import pg from "pg";

export class DatabaseListener {
  private config: pg.ClientConfig;
  private client: pg.Client | undefined;
  private retryTimeout: number;
  private docNotificationHandlers: Map<string, (msg: pg.Notification) => void>;

  constructor(config: pg.ClientConfig) {
    this.config = config;
    this.client = undefined;
    this.retryTimeout = 5000; // 再接続までの待機時間(ミリ秒)
    this.docNotificationHandlers = new Map<string, (msg: pg.Notification) => void>();
  }

  // DBとの接続を実行し、インベントハンドラーを登録する
  async connect() {
    try {
      this.client = new pg.Client(this.config);
      await this.client.connect();
      await this.client.query('LISTEN change');

      this.client.on('error', this.handleError.bind(this));
      this.client.on('end', this.handleDisconnect.bind(this));

      console.log('データベースに接続しました');
    } catch (err) {
      console.error('接続エラー:', err);
      this.scheduleReconnect();
    }
  }

  // DBから変更が通知されたときに行いたい処理を登録する関数
  onNotificationHandler(docName: string, handler: (msg: pg.Notification) => void): void {
    if (this.client == undefined) {
      console.error('データベースに接続されていません');
      return;
    }
    const targetHandler = handler.bind(this);
    this.docNotificationHandlers.set(docName, targetHandler);
    this.client.on('notification', targetHandler);
  }

  // データベースで何かしらのエラーが発生したに実行する関数(再接続処理を実装している)
  handleError() {
    console.error('データベースエラー');
    this.scheduleReconnect();
  }

  // データベースとの接続が切断されたときに実行する関数(再接続処理を実装している)
  handleDisconnect() {
    console.log('データベース接続が切断されました');
    this.scheduleReconnect();
  }

  // 再接続処理
  scheduleReconnect() {
    console.log(`${this.retryTimeout/1000}秒後に再接続を試みます...`);
    setTimeout(() => this.connect(), this.retryTimeout);
  }
}

このクラスを以下のように利用することで、postgresqlを介して、異なるバックエンドサーバー間に接続しているユーザー間でもデータを共有することが可能になります。すなわち、バックエンドサーバーのオートスケール機能を利用することが出来ます。

const databaseListener = new DatabaseListener({
  // 省略...
})
databaseListener.connect();

setPersistence({
  bindState: async (docName: string, ydoc: IWSSharedDoc) => {
    // 省略...
    databaseListener.onNotificationHandler(docName, async (msg) => {
      const payload = JSON.parse(msg.payload ?? '');
      // Postgresqlからの通知は、関心のないドキュメントの更新によっても発生する
      // したがって、関心があるドキュメントが更新された場合のみ、yjsの状態を更新する
      if (docName === payload.docname) {
        const persistedYdoc = await pgdb.getYDoc(docName);
        Y.applyUpdate(ydoc, Y.encodeStateAsUpdate(persistedYdoc));
      }
    })
  },
  ...
});

最後に、バックエンドサーバーにおける認証およびセキュリティ対策について、簡単に紹介します。 Websocket 通信は、HTTP リクエストによるハンドシェイクを行った後、双方向通信が開始されます。最初の通信が HTTP リクエストであるため、このときにブラウザから Cookie を送ってもらうことで、正規のユーザによる接続か否かを判定することが出来ます。 また、ユーザがサーバに接続を試みたとき、以下の点も追加検証することでよりセキュアな通信を実現することができます(引用: Webアプリ開発者のための HTML5 セキュリティ入門)

  • Originヘッダを確認する。
  • wssを使用し、通信を暗号化する。

y-websocketでは、デフォルトで Cookie による認証をサポートしていません。代わりに、クエリパラメータに認証情報を付与することでユーザを識別する機能を提供しています。こちらの手段でも認証を実現出来ますが、ログに残る可能性を考慮すると Cookie による認証のほうがよりセキュアであると個人的には考えています。 Cookie 認証は Websocket 通信を待ち受ける setupWSConnection のラッパーを用意することで実現できます

注意: 擬似コードです
function setupWSConnectionWithCookieAuth(conn, req, docName) {
  // originヘッダーの検証を行う
  if (req.headers.origin !== "許可しているオリジンか") {
		console.error('origin not allowed');
		conn.close();
		return;
	}

  // cookie が正しいかを確認する
  const cookie = Object.fromEntries(new URLSearchParams(req.headers.cookie?.replace(/; /g, "&")));
  if (!cookie || !cookie.yjsApiKey) {
    console.error("No cookie found");
    conn.close();
    return;
  }
  validateCookie(cookie)

  // 本来の処理を行う
  setupWSConnection(conn, req, docName);
}

const server = http.createServer();
const wss = new WebSocketServer({ server });
wss.on('connection', setupWSConnectionWithCookieAuth);

おわりに

本記事では、Yjs の紹介や基本的な使い方、使う上で考慮すべき点などを紹介しました。Yjs が利用しているアルゴリズムの詳細は割愛してしまいましたが、興味のある方はこちらの論文を参照してください。

以上、本記事の内容がどなたかの参考になれば幸いです。

エムシーデジタルでは、技術力向上のためのイベントや勉強会なども定期的に実施しています。 もしエムシーデジタルで働くことに興味を持っていただいた方がいらっしゃいましたら、カジュアル面談も受け付けておりますので、お気軽にお声掛けください! 採用情報や面談申込みはこちらから

RSS

Tags

Next

Masahiro Takahashi

Masahiro Takahashi

変数定義の工夫とホールの定理による混合整数計画ソルバーの計算時間の短縮

はじめに データサイエンティストの高橋です。 エムシーデジタルでは、現実世界の課題を数理最適化問題として定式化し、それを汎用のソルバーを用いて解くことをよく行なっています。 数理最適化問

  • #TechBlog
  • #数理最適化