BigQueryとFluentd、Google Apps Scriptを使ってビジネスKPIを監視してSlackに通知する

Posted by SpaceAgent Tech Blog スペテク on Wednesday, May 22, 2019

リードエンジニアの松田です。 今期からOKRが導入されました。過渡期で混乱しつつも頑張っています。

今回は、ビジネスKPIの監視についてです。 弊社では毎朝9時30分前後にSlackでKPIを通知するように設定しているので、その方法について紹介します。

監視しているKPIと技術選定

弊社が提供している不動産ポータルサイト「民泊物件.com」では、物件数とお問い合わせ数を主要なKPIとしています。 物件については入居者が決まった場合は成約済みに、問い合わせがありほぼ契約に至りそうな場合には掲載をストップする、など複数の状態があります。

その状態遷移も弊社では重要と考えているため、変更のたびに記録を残したいと考えました。 データベースでは最終的な状態のみを保持している(アンチパターンなので本当は修正したいのですが…)ため、変更の情報を残すため、BigQueryとfluentdを使うことにしました。 BigQueryは他の指標計測などで利用していたので引き続きの利用、fluentdはリアルタイムにログを残す必要があるため新たに導入しました。

KPI監視の流れ

以下のような流れでKPIの監視を行い、Google Apps Script(以下、GAS)からBigQueryのAPIを使って定期的にSlack通知しています。

CakePHP3でファイルを変更しFluentdでBigQueryにデータを流し込んでいます

KPI監視の方法

ここからは具体的な方法をコードベースで紹介していきます。

Webアプリケーション => Fluentd

CakePHP3のログ機能を使っています。デフォルトではログレベルが出力されてしまうため、独自にロガークラスを作って対応しました。 https://book.cakephp.org/3.0/ja/core-libraries/logging.html

FluentdのDockerコンテナを作成する

ローカルで動作確認を行うためDockerコンテナを作成していきます。 docker-compose.ymlにfluentdを追加します。ログ監視対象のファイルにアクセスできるよう、Webアプリケーションのディレクトリをvolumesに書いておきます。

# docker-compose.yml
  fluentd:
    build: ./fluentd
    ports:
      - "24224:24224"
    volumes:
      - ./fluentd/conf:/fluentd/etc
      - ./XXX:/XXX

Dockerfileでは、bigdecimalとfluent-plugin-bigqueryを追加しておきます。

プラグインはカイゼンプラットフォームさんが作られているものを利用しています。 こういったライブラリを公開してくださっている企業さん、本当に助かります。 できればスペースエージェントでも、何かオープンソースとして公開していきたいですね。 https://github.com/kaizenplatform/fluent-plugin-bigquery

# fluentd/Dockerfile
FROM fluent/fluentd:latest

RUN apk add --update --virtual .build-deps sudo build-base ruby-dev \
 && sudo gem install bigdecimal \
 && sudo gem install fluent-plugin-bigquery \
 && sudo gem sources --clear-all \
 && apk del .build-deps \
 && rm -rf /var/cache/apk/* /home/fluent/.gem/ruby/2.5.0/cache/*.gem

ファイルの変更を監視する

fluentdのsourceディレクティブを使ってCSVファイルが変更された際、起動できるようにしておきます。 CSVファイルを監視する場合はkeysを指定しておく必要があるので要注意です。

# fluent.conf
<source>
  @type tail
  path /XXX/event.log
  tag event.log
  pos_file /XXX/event.log.pos
  format csv
  keys A, B, C
</source>

Fluentd => BigQuery

matchディレクティブでBigQueryへのデータ保存設定を行います。 filterを設定しない場合、BigQueryに保存する項目以外も送信されてしまいエラーになるため、 renew_record, keep_keysを使ってBigQueryに保存するレコードのみになるように調整します。

BigQueryのjson_keyの取得はこちらに手順が載っています。(あとでURLをはる)

<filter event.log>
  @type record_transformer
  renew_record true
  keep_keys room_id, status_display_time, created
</filter>
<match event.log>
  @type bigquery_insert

  auth_method json_key
  json_key /fluentd/etc/key.json

  project projectID
  dataset datesetName
  table   tableName

  schema [
    {"name": "A", "type": "INTEGER"},
    {"name": "B", "type": "INTEGER"},
    {"name": "C", "type": "INTEGER"}
  ]
</match>

GAS => Slack

ここまででBigQueryへのデータ保存ができたので、その結果をSlackに通知していきます。

BigQueryからデータを取得する

まずは、GASからBigQueryにアクセスするための設定を行います。 Google SpreadSheetからスクリプトエディタを開き、「リソース」=> 「Googleの拡張サービス」と選択していくと、 このようなモーダルが出るため、BigQuery APIを有効にします。

下記のようにScriptを書くとBigQueryからデータを取得できます。 引数のqueryにはそのままBigQueryで使うクエリを渡します。一度、BigQueryのコンソール上でクエリを作っておくとやりやすかったです。 取得したデータはかなりネストされているような構造になっているので、デバッグツールで確認しつつ目的のデータを取り出しましょう。

コードは公式ドキュメントにもあるので、こちらも参考にしてみてください。 https://developers.google.com/apps-script/advanced/bigquery

// kpi_report.gs  

function getRows(query) {
  var projectId = 'projectId';
  var request = {
    query: query,
    "useLegacySql": false
  };
  var queryResults = BigQuery.Jobs.query(request, projectId);
  var jobId = queryResults.jobReference.jobId;

  // Check on status of the Query Job.
  var sleepTimeMs = 500;
  while (!queryResults.jobComplete) {
    Utilities.sleep(sleepTimeMs);
    sleepTimeMs *= 2;
    queryResults = BigQuery.Jobs.getQueryResults(projectId, jobId);
  }

  // Get all the rows of results.
  var rows = queryResults.rows;
  while (queryResults.pageToken) {
    queryResults = BigQuery.Jobs.getQueryResults(projectId, jobId, {
      pageToken: queryResults.pageToken
    });
    rows = rows.concat(queryResults.rows);
  }
  return rows;
}

Slackに通知する

Slackへの通知はwebhookにリクエストを送るだけで良く、このような感じになります。 URLは事前にSlackのincoming webhookから取得しておきます。

function notificate(message, channel) {
  var data = {
    "channel": channel, 
    "text": message, 
  };
  var payload = JSON.stringify(data);
  var options = {
     "method":"POST",
     "payload":payload
  };
  var res = UrlFetchApp.fetch("https://hooks.slack.com/services/XXX", options);
}

通知画面

実際の通知はこのようになりました。slackで見れるとやっぱり楽ですね。

CakePHP3でファイルを変更しFluentdでBigQueryにデータを流し込んでいます

まとめ

やっぱりBigQueryがあると分析が捗ります。 この記事では具体的すぎるため書けませんでしたが、分析の過程で初めてWindow関数を使いました。普段はフロントエンドをやっていることが多く、SQLを使う機会が多くなかったので良い学習機会になりました。

今回、fluentdを初めて利用したのですが、思ったより何でもできそうで驚きました。 ファイルの監視だけでなく、HTTPリクエストなど様々なものが監視できるようです。何でもできるからと言ってむやみに使うのはよくないと思いますが、fluentdも選択肢のひとつとして持っておこうと思いました。