とんちゃんといっしょ

Cloudに関する技術とか日常とかについて書いたり書かなかったり

GCPを使ってサーバレスでSlackの統計データを取れるようにした話

この記事はSlack Advent Calendar 2018の7日目の記事です。

(8日目を書いた後に7日目が空いていたので書くことにしたため日付を前後して書いております)

TL;DR

  • SlackのEvent APIGoogle Cloud Platformを利用するとサーバレスかつ簡単にSlackのPublicなPostデータが管理できる
  • 溜まったデータもサーバレスかつ簡単に統計データを得ることができる
  • 個人でやるにはちょっとお高めかも

背景

800人近い人がワイワイしているSlackコミュニティに所属しているのだが、 昨年まで年間の発言数ランキングなどを作ってくれていた人がいなくなったこともあり、 今年の発言ランキングをどうしようかと考えていたが、 自分でつくろう、なんなら週間ランキングも出してしまおうと思い作ることにした。

ゴール

まずは1週間の発言ランキングを毎週月曜日の9時に #general にBOTがPostすること。

全体像

f:id:mazinlabs:20181212014400p:plain
SlackのデータをBigQueryに転送

f:id:mazinlabs:20181212014434p:plain
Cloud SchedulerからのイベントをもとにBigQueryから統計データを取得してSlackにPost

BigQueryにデータを投入する際にやったこと

1. Cloud Pub/Subの設定

Cloud FunctionsからBigQueryに転送するために利用するトピックを作成

2. Cloud Functionsの設定

SlackのEvent APIから送られてくるデータをCloud Pub/Subに送るためのFunctionを作成。

今回はPythonを選択。

  • main.py
from flask import make_response
from google.cloud import pubsub_v1
import os
import json


def _event_handler(data, event_type, subtype=None):
    if event_type == 'message' and subtype == None:
        publisher = pubsub_v1.PublisherClient()
        topic_name = os.environ['TOPIC_NAME']
        payload = {
            'channel': data['channel'],
            'user': data['user'],
            'ts': data['ts'],
        }
        publisher.publish(topic_name, json.dumps(payload).encode('utf-8'))
        
    return make_response('OK', 200)

def get_slack_event(request):
    slack_event = request.get_json()

    if 'challenge' in slack_event:
        return slack_event.get("challenge")

    if "event" in slack_event:
        event = slack_event.get("event")
        event_type = event.get("type")
        subtype = event.get("subtype")
        return _event_handler(event, event_type, subtype)

    return make_response("[NO EVENT IN SLACK REQUEST] These are not the droids\
            you're looking for.", 404, {"X-Slack-No-Retry": 1})
  • requirements.txt
flask
google-cloud-pubsub
  • 環境変数 TOPIC_NAMEの設定(2で作成したCloud PubSubのTopic名)

3. SlackのEvent APIを設定

  • SlackのIntegration枠を一つ使ってCreate New Appをする。
  • 「Subscribe to Workspace Events」に「message.channels」を設定

f:id:mazinlabs:20181216225630p:plain
Subscribe to Workspace Events に設定

Event SubscriptionsのRequest URL に対して3で作成したCloud Functionsのアクセス先を設定。

4. BigQueryの設定

今回はTimestamp, Channel ID, User IDを取得することにしているのでこの3つを設定。

f:id:mazinlabs:20181216233149p:plain
BigQueryで設定したテーブルのスキーマ

5. GCSの設定

次に設定するDataflowで利用するためのディレクトリを作成

6. Dataflowの設定

  • 「テンプレートからジョブを作成」を選択
  • ジョブ名を設定
  • 「Cloud Pub/Sub to BigQuery」を選択
  • 「Cloud Pub/Sub input topic」1で作成したTopicを設定
  • 「BigQuery output table」に4で設定したテーブル名を設定
  • 「一時的なロケーション」に5で設定したGCSのパスを設定

7. 動作確認

BigQueryでデータが取れることを確認。

Query

SELECT user, count(ts) as post FROM `table_name` 
group by user
order by post desc
LIMIT 10 

f:id:mazinlabs:20181216233515p:plain
BigQueryで集計したユーザごとの発言数

BigQueryから情報を出す際にやったこと

1. SlackのBOT用のトークンを取得

SlackのページからBOT用のトークンを取得してくる

  • SlackアプリのOAuth & Permissionsのページに移動
  • Bot User OAuth Access Token」のトークンを取得

2. Cloud Functionsの設定

Cloud Schedulerから起動するCloud Functionsを設定

今回は user_weekly_post_count という名前で作成

BigQueryにはユーザIDしか入っていないのでこちらでusers.infoから名前を取得しています。

import json
import os

from datetime import date, timedelta
from flask import make_response
from google.cloud import bigquery
from slackclient import SlackClient

def create_query_string():
    query_string = """
        SELECT user, count(ts) as post 
        FROM `cloudtu-dev-02962067.ogura_bigdata_counter.messages` 
        where TIMESTAMP('{} 00:00:00') <= ts and ts < TIMESTAMP('{} 00:00:00')
        group by user
        order by post desc
        LIMIT 10"""
    
    today = date.today()
    start = (today - timedelta(days=8)).isoformat()
    end = (today - timedelta(days=1)).isoformat()
    return (start, end, query_string.format(table_name, start, end))
   
def query_user_weekly_post_count(request):    
    request_json = request.get_json()
    if not (request_json and 'key' in request_json):
        return make_response("Authentication error", 403)
    
    client = bigquery.Client()
    start, end, query = create_query_string()
    query_job = client.query(query)

    results = query_job.result()  # Waits for job to complete.

    slack_token = os.environ["SLACK_API_TOKEN"]
    key = os.environ["KEY"]
    sc = SlackClient(slack_token)
    result = '1週間({}~{})のPost数ランキング\n'.format(start, end)
    rank = 0
    for row in results:
        user = sc.api_call(
            "users.info",
            user=row.user
        )
        user_name = user["user"]["name"]
        rank += 1
        result += '{} : {} ({} posts)\n'.format(rank, user_name, row.post)
        
    sc.api_call(
        "chat.postMessage",
        channel="general",
        text=result
    )    
    return make_response(result, 200)
flask
google-cloud-bigquery
slackclient

3. Cloud Schedulerの設定

先月11月にCloud Schedulerというスケーラブルなcronのサービスが出ていたのでGAEなどを使わずCloud SchedulerからCloud Functionsが多々けるのではないかと思い試してみた。

なお、Cloud Schedulerはベータサービスということもあってか のWeb GUIではBodyの設定をしてもうまく反映されないことがわかったので、CLIから操作した。

コードは以下の通りで、URIとmessage-bodyは各自で設定が必要。

#!/bin/bash
gcloud beta scheduler jobs create http user_weekly_post_ranking    \
  --schedule="0 9 * * 1" \
  --uri="https://asia-northeast1-$PROJECT_ID.cloudfunctions.net/user_weekly_post_count" \
  --time-zone="Asia/Tokyo" \
  --headers="Content-Type=application/json" \
  --message-body="{\"key\":\"$KEY\"}"

4. 動作確認

Cloud Schedulerに「Run now」のボタンがあるので押して動作を確認。

f:id:mazinlabs:20181217002014p:plain
動作確認結果

まとめ

SlackのEvent APIGCPのサービスを利用することで簡単にSlackデータを蓄積を実現し、また定期的にSlackに統計データを通知する事が可能になりました。

ただ、Dataflowは利用は無料枠がなく使い始めると即お金がかかるのでもう少し安くするためにはCloud FunctionsからストリームでBigQueryにデータを入れる or 一時的に何処かに蓄積したものをBigQueryに投入するなどの工夫が必要になってくると思います。なので、また何か改善した際はブログに書きたいと思います。