From d94a8dbc7e9ce6ceb0008572b0a27cf477864c26 Mon Sep 17 00:00:00 2001
From: Uku Taht <Uku.taht@gmail.com>
Date: Thu, 21 May 2020 13:03:39 +0300
Subject: [PATCH] Clickhouse (#66)

* Get stats from clickhosue

* Pull stats from clickhouse

* Use correct Query namespace

* Use Clickhouse in unit tests

* Use Clickhouse in stats controller tests

* Use fixtures for unit tests

* Add Clickhouse to travis

* Use Clickhouse session store for sessions

* Add garbage collection to session store

* Reload session state from Clickhouse on server restart

* Query from sessions table

* Trap exits in event write buffer

* Run hydration without starting the whole app

* Make session length 30 minutes

* Revert changes to fingerprint schema

* Remove clickhouse from fingerprint sessions

* Flush buffers before shutdown

* Use old stats when merging

* Remove old session schema

* Fix tests with CH sessions

* Add has_pageviews? to Stats

* Use CH in staging

* Update schema

* Fix test setup
---
 lib/mix/tasks/hydrate_clickhouse.ex           | 72 ++++++++++---------
 lib/plausible/clickhouse.ex                   |  4 +-
 lib/plausible/event/clickhouse_schema.ex      | 30 ++++++++
 lib/plausible/session/clickhouse_schema.ex    |  9 ++-
 lib/plausible/stats/clickhouse.ex             | 27 +++----
 .../controllers/api/stats_controller.ex       |  2 +-
 .../controllers/stats_controller.ex           |  2 +-
 mix.exs                                       |  1 +
 mix.lock                                      |  1 +
 test/support/factory.ex                       |  5 +-
 10 files changed, 101 insertions(+), 52 deletions(-)
 create mode 100644 lib/plausible/event/clickhouse_schema.ex

diff --git a/lib/mix/tasks/hydrate_clickhouse.ex b/lib/mix/tasks/hydrate_clickhouse.ex
index 76c01fab..9279e107 100644
--- a/lib/mix/tasks/hydrate_clickhouse.ex
+++ b/lib/mix/tasks/hydrate_clickhouse.ex
@@ -2,6 +2,7 @@ defmodule Mix.Tasks.HydrateClickhouse do
   use Mix.Task
   use Plausible.Repo
   require Logger
+  @hash_key Keyword.fetch!(Application.get_env(:plausible, PlausibleWeb.Endpoint), :secret_key_base) |> binary_part(0, 16)
 
   def run(args) do
     Application.ensure_all_started(:db_connection)
@@ -21,24 +22,25 @@ defmodule Mix.Tasks.HydrateClickhouse do
 
   def create_events() do
     ddl = """
-    CREATE TABLE IF NOT EXISTS events (
+    CREATE TABLE events (
       timestamp DateTime,
       name String,
       domain String,
-      user_id FixedString(64),
+      user_id UInt64,
+      session_id UInt64,
       hostname String,
       pathname String,
-      referrer Nullable(String),
-      referrer_source Nullable(String),
-      initial_referrer Nullable(String),
-      initial_referrer_source Nullable(String),
-      country_code Nullable(FixedString(2)),
-      screen_size Nullable(String),
-      operating_system Nullable(String),
-      browser Nullable(String)
+      referrer String,
+      referrer_source String,
+      initial_referrer String,
+      initial_referrer_source String,
+      country_code LowCardinality(FixedString(2)),
+      screen_size LowCardinality(String),
+      operating_system LowCardinality(String),
+      browser LowCardinality(String)
     ) ENGINE = MergeTree()
     PARTITION BY toYYYYMM(timestamp)
-    ORDER BY (name, domain, timestamp, user_id)
+    ORDER BY (name, domain, user_id, timestamp)
     SETTINGS index_granularity = 8192
     """
 
@@ -48,29 +50,29 @@ defmodule Mix.Tasks.HydrateClickhouse do
 
   def create_sessions() do
     ddl = """
-    CREATE TABLE IF NOT EXISTS sessions (
-      session_id UUID,
+    CREATE TABLE sessions (
+      session_id UInt64,
       sign Int8,
       domain String,
-      user_id FixedString(64),
+      user_id UInt64,
       hostname String,
       timestamp DateTime,
       start DateTime,
       is_bounce UInt8,
-      entry_page Nullable(String),
-      exit_page Nullable(String),
+      entry_page String,
+      exit_page String,
       pageviews Int32,
       events Int32,
       duration UInt32,
-      referrer Nullable(String),
-      referrer_source Nullable(String),
-      country_code Nullable(FixedString(2)),
-      screen_size Nullable(String),
-      operating_system Nullable(String),
-      browser Nullable(String)
+      referrer String,
+      referrer_source String,
+      country_code LowCardinality(FixedString(2)),
+      screen_size LowCardinality(String),
+      operating_system LowCardinality(String),
+      browser LowCardinality(String)
     ) ENGINE = CollapsingMergeTree(sign)
     PARTITION BY toYYYYMM(start)
-    ORDER BY (domain, start, user_id, session_id)
+    ORDER BY (domain, user_id, session_id, start)
     SETTINGS index_granularity = 8192
     """
 
@@ -99,27 +101,33 @@ defmodule Mix.Tasks.HydrateClickhouse do
     event_chunks = from(e in Plausible.Event, where: e.domain == "plausible.io", order_by: e.id) |> chunk_query(10_000, repo)
 
     Enum.reduce(event_chunks, %{}, fn events, session_cache ->
-      {session_cache, sessions} = Enum.reduce(events, {session_cache, []}, fn event, {session_cache, sessions} ->
+      {session_cache, sessions, events} = Enum.reduce(events, {session_cache, [], []}, fn event, {session_cache, sessions, new_events} ->
         found_session = session_cache[event.fingerprint]
         active = is_active?(found_session, event)
+        user_id = SipHash.hash!(@hash_key, event.fingerprint)
+        clickhouse_event = struct(Plausible.ClickhouseEvent, Map.from_struct(event) |> Map.put(:user_id, user_id))
+
         cond do
           found_session && active ->
-            new_session = update_session(found_session, event)
+            new_session = update_session(found_session, clickhouse_event)
             {
               Map.put(session_cache, event.fingerprint, new_session),
-              [%{new_session | sign: 1}, %{found_session | sign: -1} | sessions]
+              [%{new_session | sign: 1}, %{found_session | sign: -1} | sessions],
+              new_events ++ [%{clickhouse_event | session_id: new_session.session_id}]
             }
           found_session && !active ->
-            new_session = new_session_from_event(event)
+            new_session = new_session_from_event(clickhouse_event)
             {
               Map.put(session_cache, event.fingerprint, new_session),
-              [new_session | sessions]
+              [new_session | sessions],
+              new_events ++ [%{clickhouse_event | session_id: new_session.session_id}]
             }
           true ->
-            new_session = new_session_from_event(event)
+            new_session = new_session_from_event(clickhouse_event)
             {
               Map.put(session_cache, event.fingerprint, new_session),
-              [new_session | sessions]
+              [new_session | sessions],
+              new_events ++ [%{clickhouse_event | session_id: new_session.session_id}]
             }
         end
       end)
@@ -141,10 +149,10 @@ defmodule Mix.Tasks.HydrateClickhouse do
   defp new_session_from_event(event) do
     %Plausible.ClickhouseSession{
       sign: 1,
-      session_id: UUID.uuid4(),
+      session_id: Plausible.ClickhouseSession.random_uint64(),
       hostname: event.hostname,
       domain: event.domain,
-      user_id: event.fingerprint,
+      user_id: event.user_id,
       entry_page: event.pathname,
       exit_page: event.pathname,
       is_bounce: true,
diff --git a/lib/plausible/clickhouse.ex b/lib/plausible/clickhouse.ex
index 3e83cc37..87cc106a 100644
--- a/lib/plausible/clickhouse.ex
+++ b/lib/plausible/clickhouse.ex
@@ -16,7 +16,7 @@ defmodule Plausible.Clickhouse do
     """ <> String.duplicate(" (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?),", length(events))
 
     args = Enum.reduce(events, [], fn event, acc ->
-      [event.name, event.timestamp, event.domain, event.fingerprint, event.hostname, escape_quote(event.pathname), event.referrer, event.referrer_source, event.initial_referrer, event.initial_referrer_source, event.country_code, event.screen_size, event.browser, event.operating_system] ++ acc
+      [event.name, event.timestamp, event.domain, event.user_id, event.hostname, escape_quote(event.pathname), event.referrer || "", event.referrer_source || "", event.initial_referrer || "", event.initial_referrer_source || "", event.country_code || "", event.screen_size || "", event.browser || "", event.operating_system || ""] ++ acc
     end)
 
     Clickhousex.query(:clickhouse, insert, args, log: {Plausible.Clickhouse, :log, []})
@@ -29,7 +29,7 @@ defmodule Plausible.Clickhouse do
     """ <> String.duplicate(" (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?),", Enum.count(sessions))
 
     args = Enum.reduce(sessions, [], fn session, acc ->
-      [session.sign, session.session_id, session.domain, session.user_id, session.timestamp, session.hostname, session.start, session.is_bounce && 1 || 0, session.entry_page, session.exit_page, session.events, session.pageviews, session.duration, session.referrer, session.referrer_source,session.country_code, session.screen_size, session.browser, session.operating_system] ++ acc
+      [session.sign, session.session_id, session.domain, session.user_id, session.timestamp, session.hostname, session.start, session.is_bounce && 1 || 0, session.entry_page, session.exit_page, session.events, session.pageviews, session.duration, session.referrer || "", session.referrer_source || "", session.country_code || "", session.screen_size || "", session.browser || "", session.operating_system || ""] ++ acc
     end)
 
     Clickhousex.query(:clickhouse, insert, args, log: {Plausible.Clickhouse, :log, []})
diff --git a/lib/plausible/event/clickhouse_schema.ex b/lib/plausible/event/clickhouse_schema.ex
new file mode 100644
index 00000000..824af164
--- /dev/null
+++ b/lib/plausible/event/clickhouse_schema.ex
@@ -0,0 +1,30 @@
+defmodule Plausible.ClickhouseEvent do
+  use Ecto.Schema
+  import Ecto.Changeset
+
+  schema "events" do
+    field :name, :string
+    field :domain, :string
+    field :hostname, :string
+    field :pathname, :string
+    field :user_id, :integer
+    field :session_id, :integer
+
+    field :referrer, :string
+    field :referrer_source, :string
+    field :initial_referrer, :string
+    field :initial_referrer_source, :string
+    field :country_code, :string
+    field :screen_size, :string
+    field :operating_system, :string
+    field :browser, :string
+
+    timestamps(inserted_at: :timestamp, updated_at: false)
+  end
+
+  def changeset(pageview, attrs) do
+    pageview
+    |> cast(attrs, [:name, :domain, :hostname, :pathname, :user_id, :operating_system, :browser, :referrer, :referrer_source, :initial_referrer, :initial_referrer_source, :country_code, :screen_size])
+    |> validate_required([:name, :domain, :hostname, :pathname, :user_id])
+  end
+end
diff --git a/lib/plausible/session/clickhouse_schema.ex b/lib/plausible/session/clickhouse_schema.ex
index e7b0cd2d..9e2c7c20 100644
--- a/lib/plausible/session/clickhouse_schema.ex
+++ b/lib/plausible/session/clickhouse_schema.ex
@@ -2,11 +2,12 @@ defmodule Plausible.ClickhouseSession do
   use Ecto.Schema
   import Ecto.Changeset
 
-  @primary_key {:session_id, :binary_id, autogenerate: false}
+  @primary_key false
   schema "sessions" do
     field :hostname, :string
     field :domain, :string
-    field :user_id, :string
+    field :user_id, :integer
+    field :session_id, :integer
 
     field :start, :naive_datetime
     field :duration, :integer
@@ -26,6 +27,10 @@ defmodule Plausible.ClickhouseSession do
     field :timestamp, :naive_datetime
   end
 
+  def random_uint64() do
+    :crypto.strong_rand_bytes(8) |> :binary.decode_unsigned()
+  end
+
   def changeset(session, attrs) do
     session
     |> cast(attrs, [:hostname, :domain, :entry_page, :exit_page, :referrer, :fingerprint, :start, :length, :is_bounce, :operating_system, :browser, :referrer_source, :country_code, :screen_size])
diff --git a/lib/plausible/stats/clickhouse.ex b/lib/plausible/stats/clickhouse.ex
index ee00debb..347ef435 100644
--- a/lib/plausible/stats/clickhouse.ex
+++ b/lib/plausible/stats/clickhouse.ex
@@ -155,24 +155,28 @@ defmodule Plausible.Stats.Clickhouse do
       from e in base_query(site, query),
       select: {fragment("? as name", e.initial_referrer_source), fragment("min(?) as url", e.initial_referrer), fragment("uniq(user_id) as count")},
       group_by: e.initial_referrer_source,
-      where: not is_nil(e.initial_referrer_source),
+      where: e.initial_referrer_source != "",
       order_by: [desc: fragment("count")],
       limit: ^limit
     ) |> Enum.map(fn ref ->
-      Map.update(ref, :url, nil, fn url -> url && URI.parse("http://" <> url).host end)
+      ref
+      |> Map.update("url", nil, fn url -> url && URI.parse("http://" <> url).host end)
+      |> Map.update("name", nil, fn name -> if name == "", do: "(no referrer)", else: name end)
     end)
   end
 
   def top_referrers(site, query, limit \\ 5, include \\ []) do
     referrers = Clickhouse.all(
       from e in base_query(site, query),
-      select: {fragment("? as name", e.referrer_source), fragment("min(?) as url", e.referrer), fragment("uniq(user_id) as count")},
+      select: {fragment("? as name", e.referrer_source), fragment("any(?) as url", e.referrer), fragment("uniq(user_id) as count")},
       group_by: e.referrer_source,
-      where: not is_nil(e.referrer_source),
+      where: e.referrer_source != "",
       order_by: [desc: fragment("count")],
       limit: ^limit
     ) |> Enum.map(fn ref ->
-      Map.update(ref, :url, nil, fn url -> url && URI.parse("http://" <> url).host end)
+      ref
+      |> Map.update("url", nil, fn url -> url && URI.parse("http://" <> url).host end)
+      |> Map.update("name", nil, fn name -> if name == "", do: "(no referrer)", else: name end)
     end)
 
     if "bounce_rate" in include do
@@ -191,7 +195,7 @@ defmodule Plausible.Stats.Clickhouse do
       select: {s.referrer_source, fragment("count(*) as total"), fragment("round(sum(is_bounce * sign) / sum(sign) * 100) as bounce_rate")},
       where: s.domain == ^site.domain,
       where: s.start >= ^first_datetime and s.start < ^last_datetime,
-      where: not is_nil(s.referrer_source),
+      where: s.referrer_source != "",
       group_by: s.referrer_source,
       order_by: [desc: fragment("total")],
       limit: 100
@@ -269,7 +273,7 @@ defmodule Plausible.Stats.Clickhouse do
       select: {s.referrer, fragment("count(*) as total"), fragment("round(sum(is_bounce * sign) / sum(sign) * 100) as bounce_rate")},
       where: s.domain == ^site.domain,
       where: s.start >= ^first_datetime and s.start < ^last_datetime,
-      where: not is_nil(s.referrer),
+      where: s.referrer != "",
       group_by: s.referrer,
       order_by: [desc: fragment("total")],
       limit: 100
@@ -302,7 +306,6 @@ defmodule Plausible.Stats.Clickhouse do
       select: {s.entry_page, fragment("count(*) as total"), fragment("round(sum(is_bounce * sign) / sum(sign) * 100) as bounce_rate")},
       where: s.domain == ^site.domain,
       where: s.start >= ^first_datetime and s.start < ^last_datetime,
-      where: not is_nil(s.entry_page),
       group_by: s.entry_page,
       order_by: [desc: fragment("total")],
       limit: 100
@@ -324,7 +327,7 @@ defmodule Plausible.Stats.Clickhouse do
       from e in base_query(site, query),
       select: {fragment("? as name", e.screen_size), fragment("uniq(user_id) as count")},
       group_by: e.screen_size,
-      where: not is_nil(e.screen_size)
+      where: e.screen_size != ""
     )
     |> Enum.sort(fn %{"name" => screen_size1}, %{"name" => screen_size2} ->
       index1 = Enum.find_index(@available_screen_sizes, fn s -> s == screen_size1 end)
@@ -339,7 +342,7 @@ defmodule Plausible.Stats.Clickhouse do
       from e in base_query(site, query),
       select: {fragment("? as name", e.country_code), fragment("uniq(user_id) as count")},
       group_by: e.country_code,
-      where: not is_nil(e.country_code),
+      where: e.country_code != "",
       order_by: [desc: fragment("count")]
     )
     |> Enum.map(fn stat ->
@@ -356,7 +359,7 @@ defmodule Plausible.Stats.Clickhouse do
       from e in base_query(site, query),
       select: {fragment("? as name", e.browser), fragment("uniq(user_id) as count")},
       group_by: e.browser,
-      where: not is_nil(e.browser),
+      where: e.browser != "",
       order_by: [desc: fragment("count")]
     )
     |> add_percentages
@@ -368,7 +371,7 @@ defmodule Plausible.Stats.Clickhouse do
       from e in base_query(site, query),
       select: {fragment("? as name", e.operating_system), fragment("uniq(user_id) as count")},
       group_by: e.operating_system,
-      where: not is_nil(e.operating_system),
+      where: e.operating_system != "",
       order_by: [desc: fragment("count")]
     )
     |> add_percentages
diff --git a/lib/plausible_web/controllers/api/stats_controller.ex b/lib/plausible_web/controllers/api/stats_controller.ex
index aed9b38f..9aebbbfc 100644
--- a/lib/plausible_web/controllers/api/stats_controller.ex
+++ b/lib/plausible_web/controllers/api/stats_controller.ex
@@ -1,7 +1,7 @@
 defmodule PlausibleWeb.Api.StatsController do
   use PlausibleWeb, :controller
   use Plausible.Repo
-  alias Plausible.Stats
+  alias Plausible.Stats.Clickhouse, as: Stats
   alias Plausible.Stats.Query
   plug PlausibleWeb.AuthorizeStatsPlug
 
diff --git a/lib/plausible_web/controllers/stats_controller.ex b/lib/plausible_web/controllers/stats_controller.ex
index 4732f1ae..42c5401f 100644
--- a/lib/plausible_web/controllers/stats_controller.ex
+++ b/lib/plausible_web/controllers/stats_controller.ex
@@ -1,7 +1,7 @@
 defmodule PlausibleWeb.StatsController do
   use PlausibleWeb, :controller
   use Plausible.Repo
-  alias Plausible.Stats
+  alias Plausible.Stats.Clickhouse, as: Stats
   alias Plausible.Stats.Query
 
   plug PlausibleWeb.AuthorizeStatsPlug when action in [:stats, :csv_export]
diff --git a/mix.exs b/mix.exs
index 1a3c56a7..692d2336 100644
--- a/mix.exs
+++ b/mix.exs
@@ -63,6 +63,7 @@ defmodule Plausible.MixProject do
       {:csv, "~> 2.3"},
       {:oauther, "~> 1.1"},
       {:nanoid, "~> 2.0.2"},
+      {:siphash, "~> 3.2"},
       {:clickhousex, [git: "https://github.com/atlas-forks/clickhousex.git"]}
     ]
   end
diff --git a/mix.lock b/mix.lock
index 93fba55c..6cb18970 100644
--- a/mix.lock
+++ b/mix.lock
@@ -55,6 +55,7 @@
   "ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm", "451d8527787df716d99dc36162fca05934915db0b6141bbdac2ea8d3c7afc7d7"},
   "ref_inspector": {:hex, :ref_inspector, "1.3.0", "a02b89647440d084f2867ecece7a99895bcd4683482397fe086508bb22a165f3", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, repo: "hexpm", optional: false]}, {:yamerl, "~> 0.7", [hex: :yamerl, repo: "hexpm", optional: false]}], "hexpm", "d2069ae6b371112ac696a3cd116fd1e08d5726249b8d1357f377e67f0716cc10"},
   "sentry": {:hex, :sentry, "7.2.4", "b5bc90b594d40c2e653581e797a5fd2fdf994f2568f6bd66b7fa4971598be8d5", [:mix], [{:hackney, "~> 1.8 or 1.6.5", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix, "~> 1.3", [hex: :phoenix, repo: "hexpm", optional: true]}, {:plug, "~> 1.6", [hex: :plug, repo: "hexpm", optional: true]}, {:plug_cowboy, "~> 1.0 or ~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}], "hexpm", "4ee4d368b5013076afcc8b73ed028bdc8ee9db84ea987e3591101e194c1fc24b"},
+  "siphash": {:hex, :siphash, "3.2.0", "ec03fd4066259218c85e2a4b8eec4bb9663bc02b127ea8a0836db376ba73f2ed", [:make, :mix], [], "hexpm", "ba3810701c6e95637a745e186e8a4899087c3b079ba88fb8f33df054c3b0b7c3"},
   "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.5", "6eaf7ad16cb568bb01753dbbd7a95ff8b91c7979482b95f38443fe2c8852a79b", [:make, :mix, :rebar3], [], "hexpm", "13104d7897e38ed7f044c4de953a6c28597d1c952075eb2e328bc6d6f2bfc496"},
   "telemetry": {:hex, :telemetry, "0.4.1", "ae2718484892448a24470e6aa341bc847c3277bfb8d4e9289f7474d752c09c7f", [:rebar3], [], "hexpm", "4738382e36a0a9a2b6e25d67c960e40e1a2c95560b9f936d8e29de8cd858480f"},
   "timex": {:hex, :timex, "3.6.1", "efdf56d0e67a6b956cc57774353b0329c8ab7726766a11547e529357ffdc1d56", [:mix], [{:combine, "~> 0.10", [hex: :combine, repo: "hexpm", optional: false]}, {:gettext, "~> 0.10", [hex: :gettext, repo: "hexpm", optional: false]}, {:tzdata, "~> 0.1.8 or ~> 0.5 or ~> 1.0.0", [hex: :tzdata, repo: "hexpm", optional: false]}], "hexpm", "f354efb2400dd7a80fd9eb6c8419068c4f632da4ac47f3d8822d6e33f08bc852"},
diff --git a/test/support/factory.ex b/test/support/factory.ex
index 360da0a2..06fa68b6 100644
--- a/test/support/factory.ex
+++ b/test/support/factory.ex
@@ -1,5 +1,6 @@
 defmodule Plausible.Factory do
   use ExMachina.Ecto, repo: Plausible.Repo
+  @hash_key Keyword.fetch!(Application.get_env(:plausible, PlausibleWeb.Endpoint), :secret_key_base) |> binary_part(0, 16)
 
   def user_factory(attrs) do
     pw = Map.get(attrs, :password, "password")
@@ -67,12 +68,12 @@ defmodule Plausible.Factory do
   def event_factory do
     hostname = sequence(:domain, &"example-#{&1}.com")
 
-    %Plausible.Event{
+    %Plausible.ClickhouseEvent{
       hostname: hostname,
       domain: hostname,
       pathname: "/",
       timestamp: Timex.now(),
-      fingerprint: UUID.uuid4()
+      user_id: SipHash.hash!(@hash_key, UUID.uuid4())
     }
   end
 
-- 
GitLab