diff --git a/config/dev.exs b/config/dev.exs index e1fb676a4b8bbcecb3edab374c1fd54e32957016..f8be6ec56056c726a10083bcbc08eb6abbd9df81 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -35,6 +35,11 @@ config :logger, :console, format: "[$level] $message\n" config :phoenix, :stacktrace_depth, 20 config :phoenix, :plug_init_mode, :runtime +config :plausible, :clickhouse, + hostname: "localhost", + database: "plausible_dev", + pool_size: 10 + config :plausible, Plausible.Repo, username: "postgres", password: "postgres", diff --git a/config/prod.exs b/config/prod.exs index 31ea437666ff1b4e01332bb157cb54e2178adc39..3691c77cfd7dbc25c12189affd77d705ea777f01 100644 --- a/config/prod.exs +++ b/config/prod.exs @@ -71,6 +71,13 @@ config :logger, level: :info config :plausible, PlausibleWeb.Endpoint, secret_key_base: System.get_env("SECRET_KEY_BASE") +config :plausible, :clickhouse, + hostname: System.get_env("CLICKHOUSE_DATABASE_HOST"), + database: System.get_env("CLICKHOUSE_DATABASE_NAME"), + username: System.get_env("CLICKHOUSE_DATABASE_USER"), + password: System.get_env("CLICKHOUSE_DATABASE_PASSWORD"), + pool_size: 30 + # Configure your database config :plausible, Plausible.Repo, adapter: Ecto.Adapters.Postgres, diff --git a/config/test.exs b/config/test.exs index d0ca5221d40d4823d238f38748d54c5328195dc3..337864fe13bfb181cadba3efad5490397aa28245 100644 --- a/config/test.exs +++ b/config/test.exs @@ -20,6 +20,11 @@ config :plausible, Plausible.Repo, hostname: "localhost", pool: Ecto.Adapters.SQL.Sandbox +config :plausible, :clickhouse, + hostname: "localhost", + database: "plausible_test", + pool_size: 10 + config :plausible, Plausible.Mailer, adapter: Bamboo.TestAdapter diff --git a/lib/mix/tasks/hydrate_clickhouse.ex b/lib/mix/tasks/hydrate_clickhouse.ex new file mode 100644 index 0000000000000000000000000000000000000000..4e2da765607161730fbb3b1346e653dd329a7edd --- /dev/null +++ b/lib/mix/tasks/hydrate_clickhouse.ex @@ -0,0 +1,126 @@ +defmodule Mix.Tasks.HydrateClickhouse do + use Mix.Task + use Plausible.Repo + require Logger + + def run(args) do + Application.ensure_all_started(:plausible) + execute(args) + end + + def execute(_args \\ []) do + create_events() + create_sessions() + hydrate_sessions() + hydrate_events() + end + + def create_events() do + ddl = """ + CREATE TABLE IF NOT EXISTS events ( + timestamp DateTime, + name String, + domain String, + user_id FixedString(64), + hostname String, + pathname String, + referrer Nullable(String), + referrer_source Nullable(String), + initial_referrer Nullable(String), + initial_referrer_source Nullable(String), + country_code Nullable(FixedString(2)), + screen_size Nullable(String), + operating_system Nullable(String), + browser Nullable(String) + ) ENGINE = MergeTree() + PARTITION BY toYYYYMM(timestamp) + ORDER BY (name, domain, timestamp, user_id) + SETTINGS index_granularity = 8192 + """ + + Clickhousex.query(:clickhouse, ddl, []) + |> log + end + + def create_sessions() do + ddl = """ + CREATE TABLE IF NOT EXISTS sessions ( + domain String, + user_id FixedString(64), + hostname String, + start DateTime, + is_bounce UInt8, + entry_page Nullable(String), + exit_page Nullable(String), + referrer Nullable(String), + referrer_source Nullable(String), + country_code Nullable(FixedString(2)), + screen_size Nullable(String), + operating_system Nullable(String), + browser Nullable(String) + ) ENGINE = MergeTree() + PARTITION BY toYYYYMM(start) + ORDER BY (domain, start, user_id) + SETTINGS index_granularity = 8192 + """ + + Clickhousex.query(:clickhouse, ddl, []) + |> log + end + + def chunk_query(queryable, chunk_size) do + chunk_stream = Stream.unfold(0, fn page_number -> + offset = chunk_size * page_number + page = from( + q in queryable, + offset: ^offset, + limit: ^chunk_size + ) |> Repo.all(timeout: :infinity) + {page, page_number + 1} + end) + Stream.take_while(chunk_stream, fn [] -> false; _ -> true end) + end + + def escape_quote(s) do + String.replace(s, "'", "''") + end + + def hydrate_events(_args \\ []) do + event_chunks = from(e in Plausible.Event, order_by: e.id) |> chunk_query(10_000) + + for chunk <- event_chunks do + insert = """ + INSERT INTO events (name, timestamp, domain, user_id, hostname, pathname, referrer, referrer_source, initial_referrer, initial_referrer_source, country_code, screen_size, browser, operating_system) + VALUES + """ <> String.duplicate(" (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?),", Enum.count(chunk)) + + args = Enum.reduce(chunk, [], fn event, acc -> + acc ++ [event.name, event.timestamp, event.domain, event.fingerprint, event.hostname, escape_quote(event.pathname), event.referrer, event.referrer_source, event.initial_referrer, event.initial_referrer_source, event.country_code, event.screen_size, event.browser, event.operating_system] + end) + + Clickhousex.query(:clickhouse, insert, args) + |> log + end + end + + def hydrate_sessions(_args \\ []) do + session_chunks = Repo.all(from e in Plausible.FingerprintSession, order_by: e.id) |> chunk_query(10_000) + + for chunk <- session_chunks do + insert = """ + INSERT INTO sessions (domain, user_id, hostname, start, is_bounce, entry_page, exit_page, referrer, referrer_source, country_code, screen_size, browser, operating_system) + VALUES + """ <> String.duplicate(" (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?),", Enum.count(chunk)) + + args = Enum.reduce(chunk, [], fn session, acc -> + acc ++ [session.domain, session.fingerprint, session.hostname, session.start, session.is_bounce && 1 || 0, session.entry_page, session.exit_page, session.referrer, session.referrer_source,session.country_code, session.screen_size, session.browser, session.operating_system] + end) + + Clickhousex.query(:clickhouse, insert, args) + |> log + end + end + + defp log({:ok, res}), do: Logger.info("#{inspect res}") + defp log({:error, e}), do: Logger.error("[ERROR] #{inspect e}") +end diff --git a/lib/plausible/application.ex b/lib/plausible/application.ex index 14186bdeb6f38d1e5f03672df1e8d5cb5775ff34..c215600b3ba2bc57a20cc3f45a626dab8e072c1d 100644 --- a/lib/plausible/application.ex +++ b/lib/plausible/application.ex @@ -4,9 +4,13 @@ defmodule Plausible.Application do use Application def start(_type, _args) do + clickhouse_config = Application.get_env(:plausible, :clickhouse) children = [ Plausible.Repo, - PlausibleWeb.Endpoint + PlausibleWeb.Endpoint, + Plausible.Event.WriteBuffer, + Plausible.Session.WriteBuffer, + Clickhousex.child_spec(Keyword.merge([scheme: :http, port: 8123, name: :clickhouse], clickhouse_config)) ] opts = [strategy: :one_for_one, name: Plausible.Supervisor] diff --git a/lib/plausible/event/write_buffer.ex b/lib/plausible/event/write_buffer.ex new file mode 100644 index 0000000000000000000000000000000000000000..dae94fa5d18a552e61e6d59987b3f6496fa83b53 --- /dev/null +++ b/lib/plausible/event/write_buffer.ex @@ -0,0 +1,61 @@ +defmodule Plausible.Event.WriteBuffer do + use GenServer + require Logger + @flush_interval_ms 1000 + @max_buffer_size 10_000 + + def start_link(_opts) do + GenServer.start_link(__MODULE__, [], name: __MODULE__) + end + + def init(buffer) do + timer = Process.send_after(self(), :tick, @flush_interval_ms) + {:ok, %{buffer: buffer, timer: timer}} + end + + def insert(event) do + GenServer.cast(__MODULE__, {:insert, event}) + {:ok, event} + end + + def handle_cast({:insert, event}, %{buffer: buffer} = state) do + new_buffer = [ event | buffer ] + + if length(new_buffer) >= @max_buffer_size do + Logger.debug("Buffer full, flushing to disk") + Process.cancel_timer(state[:timer]) + flush(new_buffer) + new_timer = Process.send_after(self(), :tick, @flush_interval_ms) + {:noreply, %{buffer: [], timer: new_timer}} + else + {:noreply, %{state | buffer: new_buffer}} + end + end + + def handle_info(:tick, %{buffer: buffer}) do + flush(buffer) + timer = Process.send_after(self(), :tick, @flush_interval_ms) + {:noreply, %{buffer: [], timer: timer}} + end + + defp flush(buffer) do + case buffer do + [] -> nil + events -> insert_events(events) + end + end + + defp insert_events(events) do + Logger.debug("Flushing #{length(events)} events") + insert = """ + INSERT INTO events (name, timestamp, domain, user_id, hostname, pathname, referrer, referrer_source, initial_referrer, initial_referrer_source, country_code, screen_size, browser, operating_system) + VALUES + """ <> String.duplicate(" (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?),", length(events)) + + args = Enum.reduce(events, [], fn event, acc -> + [event.name, event.timestamp, event.domain, event.fingerprint, event.hostname, event.pathname, event.referrer, event.referrer_source, event.initial_referrer, event.initial_referrer_source, event.country_code, event.screen_size, event.browser, event.operating_system] ++ acc + end) + + Clickhousex.query(:clickhouse, insert, args, log: {Plausible.Stats, :log, []}) + end +end diff --git a/lib/plausible/ingest/fingerprint_session.ex b/lib/plausible/ingest/fingerprint_session.ex index 864fe598ac975724ed7662415ba046bcff19d1c3..09cfcb6a663610058acef4fad11c25e61a2a5d12 100644 --- a/lib/plausible/ingest/fingerprint_session.ex +++ b/lib/plausible/ingest/fingerprint_session.ex @@ -46,7 +46,7 @@ defmodule Plausible.Ingest.FingerprintSession do Timex.diff(state[:last_unload], first_event.timestamp, :seconds) end - Plausible.FingerprintSession.changeset(%Plausible.FingerprintSession{}, %{ + changeset = Plausible.FingerprintSession.changeset(%Plausible.FingerprintSession{}, %{ hostname: first_event.hostname, domain: first_event.domain, fingerprint: first_event.fingerprint, @@ -60,7 +60,10 @@ defmodule Plausible.Ingest.FingerprintSession do operating_system: first_event.operating_system, browser: first_event.browser, start: first_event.timestamp - }) |> Repo.insert! + }) + + if changeset.valid? && changeset.data.domain in ["plausible.io", "localtest.me"], do: Plausible.Session.WriteBuffer.insert(changeset.data) + Repo.insert!(changeset) end {:stop, :normal, state} diff --git a/lib/plausible/session/write_buffer.ex b/lib/plausible/session/write_buffer.ex new file mode 100644 index 0000000000000000000000000000000000000000..93ab56b1ddfcfb1c46f7bcae14a0640b2fb4eed4 --- /dev/null +++ b/lib/plausible/session/write_buffer.ex @@ -0,0 +1,61 @@ +defmodule Plausible.Session.WriteBuffer do + use GenServer + require Logger + @flush_interval_ms 1000 + @max_buffer_size 10_000 + + def start_link(_opts) do + GenServer.start_link(__MODULE__, [], name: __MODULE__) + end + + def init(buffer) do + timer = Process.send_after(self(), :tick, @flush_interval_ms) + {:ok, %{buffer: buffer, timer: timer}} + end + + def insert(session) do + GenServer.cast(__MODULE__, {:insert, session}) + {:ok, session} + end + + def handle_cast({:insert, session}, %{buffer: buffer} = state) do + new_buffer = [ session | buffer ] + + if length(new_buffer) >= @max_buffer_size do + Logger.debug("Buffer full, flushing to disk") + Process.cancel_timer(state[:timer]) + flush(new_buffer) + new_timer = Process.send_after(self(), :tick, @flush_interval_ms) + {:noreply, %{buffer: [], timer: new_timer}} + else + {:noreply, %{state | buffer: new_buffer}} + end + end + + def handle_info(:tick, %{buffer: buffer}) do + flush(buffer) + timer = Process.send_after(self(), :tick, @flush_interval_ms) + {:noreply, %{buffer: [], timer: timer}} + end + + defp flush(buffer) do + case buffer do + [] -> nil + sessions -> insert_sessions(sessions) + end + end + + defp insert_sessions(sessions) do + Logger.debug("Flushing #{length(sessions)} sessions") + insert = """ + INSERT INTO sessions (domain, user_id, hostname, start, is_bounce, entry_page, exit_page, referrer, referrer_source, country_code, screen_size, browser, operating_system) + VALUES + """ <> String.duplicate(" (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?),", Enum.count(sessions)) + + args = Enum.reduce(sessions, [], fn session, acc -> + [session.domain, session.fingerprint, session.hostname, session.start, session.is_bounce && 1 || 0, session.entry_page, session.exit_page, session.referrer, session.referrer_source,session.country_code, session.screen_size, session.browser, session.operating_system] ++ acc + end) + + Clickhousex.query(:clickhouse, insert, args, log: {Plausible.Stats, :log, []}) + end +end diff --git a/lib/plausible/stats/clickhouse.ex b/lib/plausible/stats/clickhouse.ex new file mode 100644 index 0000000000000000000000000000000000000000..b6790c172ccf8560ba904885916f6c068b125a1d --- /dev/null +++ b/lib/plausible/stats/clickhouse.ex @@ -0,0 +1,503 @@ +defmodule Plausible.Clickhouse do + use Plausible.Repo + alias Plausible.Stats.Query + + def compare_pageviews_and_visitors(site, query, {pageviews, visitors}) do + query = Query.shift_back(query) + {old_pageviews, old_visitors} = pageviews_and_visitors(site, query) + + cond do + old_pageviews == 0 and pageviews > 0 -> + {100, 100} + old_pageviews == 0 and pageviews == 0 -> + {0, 0} + true -> + { + round((pageviews - old_pageviews) / old_pageviews * 100), + round((visitors - old_visitors) / old_visitors * 100), + } + + end + end + + def calculate_plot(site, %Query{step_type: "month"} = query) do + steps = Enum.map((query.steps - 1)..0, fn shift -> + Timex.now(site.timezone) + |> Timex.beginning_of_month + |> Timex.shift(months: -shift) + |> DateTime.to_date + end) + + groups = clickhouse_all( + from e in base_query(site, %{query | filters: %{}}), + select: {fragment("toStartOfMonth(toTimeZone(?, ?)) as month", e.timestamp, ^site.timezone), fragment("uniq(?) as visitors", e.user_id)}, + group_by: fragment("month"), + order_by: fragment("month") + ) + |> Enum.map(fn row -> {row["month"], row["visitors"]} end) + |> Enum.into(%{}) + + compare_groups = if query.filters["goal"] do + clickhouse_all( + from e in base_query(site, query), + select: {fragment("toStartOfMonth(toTimeZone(?, ?)) as month", e.timestamp, ^site.timezone), fragment("uniq(?) as visitors", e.user_id)}, + group_by: fragment("month"), + order_by: fragment("month") + ) + |> Enum.map(fn row -> {row["month"], row["visitors"]} end) + |> Enum.into(%{}) + end + + present_index = Enum.find_index(steps, fn step -> step == Timex.now(site.timezone) |> Timex.to_date |> Timex.beginning_of_month end) + plot = Enum.map(steps, fn step -> groups[step] || 0 end) + compare_plot = compare_groups && Enum.map(steps, fn step -> compare_groups[step] || 0 end) + labels = Enum.map(steps, fn step -> Timex.format!(step, "{ISOdate}") end) + + {plot, compare_plot, labels, present_index} + end + + def calculate_plot(site, %Query{step_type: "date"} = query) do + steps = Enum.into(query.date_range, []) + + groups = clickhouse_all( + from e in base_query(site, %{ query | filters: %{} }), + select: {fragment("toDate(toTimeZone(?, ?)) as day", e.timestamp, ^site.timezone), fragment("uniq(?) as visitors", e.user_id)}, + group_by: fragment("day"), + order_by: fragment("day") + ) + |> Enum.map(fn row -> {row["day"], row["visitors"]} end) + |> Enum.into(%{}) + + compare_groups = if query.filters["goal"] do + clickhouse_all( + from e in base_query(site, query), + select: {fragment("toDate(toTimeZone(?, ?)) as day", e.timestamp, ^site.timezone), fragment("uniq(?) as visitors", e.user_id)}, + group_by: fragment("day"), + order_by: fragment("day") + ) + |> Enum.map(fn row -> {row["day"], row["visitors"]} end) + |> Enum.into(%{}) + end + + present_index = Enum.find_index(steps, fn step -> step == Timex.now(site.timezone) |> Timex.to_date end) + steps_to_show = if present_index, do: present_index + 1, else: Enum.count(steps) + plot = Enum.map(steps, fn step -> groups[step] || 0 end) |> Enum.take(steps_to_show) + compare_plot = compare_groups && Enum.map(steps, fn step -> compare_groups[step] || 0 end) + labels = Enum.map(steps, fn step -> Timex.format!(step, "{ISOdate}") end) + + {plot, compare_plot, labels, present_index} + end + + def calculate_plot(site, %Query{step_type: "hour"} = query) do + steps = 0..23 + + groups = clickhouse_all( + from e in base_query(site, %{query | filters: %{}}), + select: {fragment("toHour(toTimeZone(?, ?)) as hour", e.timestamp, ^site.timezone), fragment("uniq(?) as visitors", e.user_id)}, + group_by: fragment("hour"), + order_by: fragment("hour") + ) + |> Enum.map(fn row -> {row["hour"], row["visitors"]} end) + |> Enum.into(%{}) + + compare_groups = if query.filters["goal"] do + clickhouse_all( + from e in base_query(site, query), + select: {fragment("toHour(toTimeZone(?, ?)) as hour", e.timestamp, ^site.timezone), fragment("uniq(?) as visitors", e.user_id)}, + group_by: fragment("hour"), + order_by: fragment("hour") + ) + |> Enum.map(fn row -> {row["hour"], row["visitors"]} end) + |> Enum.into(%{}) + end + + now = Timex.now(site.timezone) + is_today = Timex.to_date(now) == query.date_range.first + present_index = is_today && Enum.find_index(steps, fn step -> step == now.hour end) + steps_to_show = if present_index, do: present_index + 1, else: Enum.count(steps) + labels = Enum.map(steps, fn step -> Timex.to_datetime(query.date_range.first) |> Timex.shift(hours: step) |> NaiveDateTime.to_iso8601 end) + plot = Enum.map(steps, fn step -> groups[step] || 0 end) |> Enum.take(steps_to_show) + compare_plot = compare_groups && Enum.map(steps, fn step -> compare_groups[step] || 0 end) + {plot, compare_plot, labels, present_index} + end + + def bounce_rate(site, query) do + {first_datetime, last_datetime} = date_range_utc_boundaries(query.date_range, site.timezone) + + [res] = clickhouse_all( + from s in "sessions", + select: {fragment("round(countIf(is_bounce = 1) / count(*) * 100) as bounce_rate")}, + where: s.domain == ^site.domain, + where: s.start >= ^first_datetime and s.start < ^last_datetime + ) + res["bounce_rate"] || 0 + end + + def pageviews_and_visitors(site, query) do + [res] = clickhouse_all( + from e in base_query(site, query), + select: {fragment("count(*) as pageviews"), fragment("uniq(user_id) as visitors")} + ) + {res["pageviews"], res["visitors"]} + end + + def unique_visitors(site, query) do + [res] = clickhouse_all( + from e in base_query(site, query), + select: fragment("uniq(user_id) as visitors") + ) + res["visitors"] + end + + def top_referrers_for_goal(site, query, limit \\ 5) do + clickhouse_all( + from e in base_query(site, query), + select: {fragment("? as name", e.initial_referrer_source), fragment("min(?) as url", e.initial_referrer), fragment("uniq(user_id) as count")}, + group_by: e.initial_referrer_source, + where: not is_nil(e.initial_referrer_source), + order_by: [desc: fragment("count")], + limit: ^limit + ) |> Enum.map(fn ref -> + Map.update(ref, :url, nil, fn url -> url && URI.parse("http://" <> url).host end) + end) + end + + def top_referrers(site, query, limit \\ 5, include \\ []) do + referrers = clickhouse_all( + from e in base_query(site, query), + select: {fragment("? as name", e.referrer_source), fragment("min(?) as url", e.referrer), fragment("uniq(user_id) as count")}, + group_by: e.referrer_source, + where: not is_nil(e.referrer_source), + order_by: [desc: fragment("count")], + limit: ^limit + ) |> Enum.map(fn ref -> + Map.update(ref, :url, nil, fn url -> url && URI.parse("http://" <> url).host end) + end) + + if "bounce_rate" in include do + bounce_rates = bounce_rates_by_referrer_source(site, query) + Enum.map(referrers, fn referrer -> Map.put(referrer, "bounce_rate", bounce_rates[referrer["name"]]) end) + else + referrers + end + end + + defp bounce_rates_by_referrer_source(site, query) do + {first_datetime, last_datetime} = date_range_utc_boundaries(query.date_range, site.timezone) + + clickhouse_all( + from s in "sessions", + select: {s.referrer_source, fragment("count(*) as total"), fragment("round(countIf(is_bounce = 1) / total * 100) as bounce_rate")}, + where: s.domain == ^site.domain, + where: s.start >= ^first_datetime and s.start < ^last_datetime, + where: not is_nil(s.referrer_source), + group_by: s.referrer_source, + order_by: [desc: fragment("total")], + limit: 100 + ) |> Enum.map(fn row -> {row["referrer_source"], row["bounce_rate"]} end) + |> Enum.into(%{}) + end + + def visitors_from_referrer(site, query, referrer) do + [res] = clickhouse_all( + from e in base_query(site, query), + select: fragment("uniq(user_id) as visitors"), + where: e.referrer_source == ^referrer + ) + res["visitors"] + end + + def conversions_from_referrer(site, query, referrer) do + [res] = clickhouse_all( + from e in base_query(site, query), + select: fragment("uniq(user_id) as visitors"), + where: e.initial_referrer_source == ^referrer + ) + res["visitors"] + end + + def referrer_drilldown(site, query, referrer, include \\ []) do + referring_urls = clickhouse_all( + from e in base_query(site, query), + select: {fragment("? as name", e.referrer), fragment("uniq(user_id) as count")}, + group_by: e.referrer, + where: e.referrer_source == ^referrer, + order_by: [desc: fragment("count")], + limit: 100 + ) + + referring_urls = if "bounce_rate" in include do + bounce_rates = bounce_rates_by_referring_url(site, query) + Enum.map(referring_urls, fn url -> Map.put(url, "bounce_rate", bounce_rates[url["name"]]) end) + else + referring_urls + end + + if referrer == "Twitter" do + urls = Enum.map(referring_urls, &(&1[:name])) + + tweets = Repo.all( + from t in Plausible.Twitter.Tweet, + where: t.link in ^urls + ) |> Enum.group_by(&(&1.link)) + + Enum.map(referring_urls, fn url -> + Map.put(url, :tweets, tweets[url[:name]]) + end) + else + referring_urls + end + end + + def referrer_drilldown_for_goal(site, query, referrer) do + clickhouse_all( + from e in base_query(site, query), + select: {fragment("? as name", e.initial_referrer), fragment("uniq(user_id) as count")}, + group_by: e.initial_referrer, + where: e.initial_referrer_source == ^referrer, + order_by: [desc: fragment("count")], + limit: 100 + ) + end + + defp bounce_rates_by_referring_url(site, query) do + {first_datetime, last_datetime} = date_range_utc_boundaries(query.date_range, site.timezone) + + clickhouse_all( + from s in "sessions", + select: {s.referrer, fragment("count(*) as total"), fragment("round(countIf(is_bounce = 1) / total * 100) as bounce_rate")}, + where: s.domain == ^site.domain, + where: s.start >= ^first_datetime and s.start < ^last_datetime, + where: not is_nil(s.referrer), + group_by: s.referrer, + order_by: [desc: fragment("total")], + limit: 100 + ) |> Enum.map(fn row -> {row["referrer"], row["bounce_rate"]} end) + |> Enum.into(%{}) + end + + def top_pages(site, query, limit \\ 5, include \\ []) do + pages = clickhouse_all( + from e in base_query(site, query), + select: {fragment("? as name", e.pathname), fragment("count(?) as count", e.pathname)}, + group_by: e.pathname, + order_by: [desc: fragment("count")], + limit: ^limit + ) + + if "bounce_rate" in include do + bounce_rates = bounce_rates_by_page_url(site, query) + Enum.map(pages, fn url -> Map.put(url, "bounce_rate", bounce_rates[url["name"]]) end) + else + pages + end + end + + defp bounce_rates_by_page_url(site, query) do + {first_datetime, last_datetime} = date_range_utc_boundaries(query.date_range, site.timezone) + + clickhouse_all( + from s in "sessions", + select: {s.entry_page, fragment("count(*) as total"), fragment("round(countIf(is_bounce = 1) / total * 100) as bounce_rate")}, + where: s.domain == ^site.domain, + where: s.start >= ^first_datetime and s.start < ^last_datetime, + where: not is_nil(s.entry_page), + group_by: s.entry_page, + order_by: [desc: fragment("total")], + limit: 100 + ) |> Enum.map(fn row -> {row["entry_page"], row["bounce_rate"]} end) + |> Enum.into(%{}) + end + + defp add_percentages(stat_list) do + total = Enum.reduce(stat_list, 0, fn %{"count" => count}, total -> total + count end) + Enum.map(stat_list, fn stat -> + Map.put(stat, "percentage", round(stat["count"] / total * 100)) + end) + end + + @available_screen_sizes ["Desktop", "Laptop", "Tablet", "Mobile"] + + def top_screen_sizes(site, query) do + clickhouse_all( + from e in base_query(site, query), + select: {fragment("? as name", e.screen_size), fragment("uniq(user_id) as count")}, + group_by: e.screen_size, + where: not is_nil(e.screen_size) + ) + |> Enum.sort(fn %{"name" => screen_size1}, %{"name" => screen_size2} -> + index1 = Enum.find_index(@available_screen_sizes, fn s -> s == screen_size1 end) + index2 = Enum.find_index(@available_screen_sizes, fn s -> s == screen_size2 end) + index2 > index1 + end) + |> add_percentages + end + + def countries(site, query) do + clickhouse_all( + from e in base_query(site, query), + select: {fragment("? as name", e.country_code), fragment("uniq(user_id) as count")}, + group_by: e.country_code, + where: not is_nil(e.country_code), + order_by: [desc: fragment("count")] + ) + |> Enum.map(fn stat -> + two_letter_code = stat["name"] + stat + |> Map.put("name", Plausible.Stats.CountryName.to_alpha3(two_letter_code)) + |> Map.put("full_country_name", Plausible.Stats.CountryName.from_iso3166(two_letter_code)) + end) + |> add_percentages + end + + def browsers(site, query, limit \\ 5) do + clickhouse_all( + from e in base_query(site, query), + select: {fragment("? as name", e.browser), fragment("uniq(user_id) as count")}, + group_by: e.browser, + where: not is_nil(e.browser), + order_by: [desc: fragment("count")] + ) + |> add_percentages + |> Enum.take(limit) + end + + def operating_systems(site, query, limit \\ 5) do + clickhouse_all( + from e in base_query(site, query), + select: {fragment("? as name", e.operating_system), fragment("uniq(user_id) as count")}, + group_by: e.operating_system, + where: not is_nil(e.operating_system), + order_by: [desc: fragment("count")] + ) + |> add_percentages + |> Enum.take(limit) + end + + def current_visitors(site) do + [res] = clickhouse_all( + from e in "events", + select: fragment("uniq(user_id) as visitors"), + where: e.timestamp >= fragment("now() - INTERVAL 5 MINUTE"), + where: e.domain == ^site.domain + ) + + res["visitors"] + end + + def goal_conversions(site, %Query{filters: %{"goal" => goal}} = query) when is_binary(goal) do + clickhouse_all( + from e in base_query(site, query), + select: {e.name, fragment("uniq(user_id) as count")}, + group_by: e.name, + order_by: [desc: fragment("count")] + ) |> Enum.map(fn row -> %{"name" => goal, "count" => row["count"]} end) + end + + def goal_conversions(site, query) do + goals = Repo.all(from g in Plausible.Goal, where: g.domain == ^site.domain) + fetch_pageview_goals(goals, site, query) + ++ fetch_event_goals(goals, site, query) + |> sort_conversions() + end + + defp fetch_event_goals(goals, site, query) do + events = Enum.map(goals, fn goal -> goal.event_name end) + |> Enum.filter(&(&1)) + + if Enum.count(events) > 0 do + clickhouse_all( + from e in base_query(site, query, events), + select: {e.name, fragment("uniq(user_id) as count")}, + group_by: e.name + ) + else + [] + end + end + + defp fetch_pageview_goals(goals, site, query) do + pages = Enum.map(goals, fn goal -> goal.page_path end) + |> Enum.filter(&(&1)) + + if Enum.count(pages) > 0 do + clickhouse_all( + from e in base_query(site, query), + select: {fragment("concat('Visit ', ?) as name", e.pathname), fragment("uniq(user_id) as count")}, + where: fragment("? in ?", e.pathname, ^pages), + group_by: e.pathname + ) + else + [] + end + end + + defp sort_conversions(conversions) do + Enum.sort_by(conversions, fn conversion -> -conversion["count"] end) + end + + defp base_query(site, query, events \\ ["pageview"]) do + {first_datetime, last_datetime} = date_range_utc_boundaries(query.date_range, site.timezone) + {goal_event, path} = event_name_for_goal(query) + + q = from(e in "events", + where: e.domain == ^site.domain, + where: e.timestamp >= ^first_datetime and e.timestamp < ^last_datetime + ) + + q = if path do + from(e in q, where: e.pathname == ^path) + else + q + end + + if goal_event do + from(e in q, where: e.name == ^goal_event) + else + from(e in q, where: fragment("? IN ?", e.name, ^events)) + end + end + + defp clickhouse_all(query) do + {q, params} = Ecto.Adapters.SQL.to_sql(:all, Repo, query) + q = String.replace(q, ~r/\$[0-9]+/, "?") + res = Clickhousex.query!(:clickhouse, q, params, log: {Plausible.Clickhouse, :log, []}) + Enum.map(res.rows, fn row -> + Enum.zip(res.columns, row) + |> Enum.into(%{}) + end) + end + + def log(query) do + require Logger + timing = System.convert_time_unit(query.connection_time, :native, :millisecond) + Logger.info("Clickhouse query OK db=#{timing}ms") + Logger.debug(fn -> + statement = String.replace(query.query.statement, "\n", " ") + "#{statement} #{inspect query.params}" + end) + end + + defp date_range_utc_boundaries(date_range, timezone) do + {:ok, first} = NaiveDateTime.new(date_range.first, ~T[00:00:00]) + first_datetime = Timex.to_datetime(first, timezone) + |> Timex.Timezone.convert("UTC") + + {:ok, last} = NaiveDateTime.new(date_range.last |> Timex.shift(days: 1), ~T[00:00:00]) + last_datetime = Timex.to_datetime(last, timezone) + |> Timex.Timezone.convert("UTC") + + {first_datetime, last_datetime} + end + + defp event_name_for_goal(query) do + case query.filters["goal"] do + "Visit " <> page -> + {"pageview", page} + goal when is_binary(goal) -> + {goal, nil} + _ -> + {nil, nil} + end + end +end diff --git a/lib/plausible_web/controllers/api/external_controller.ex b/lib/plausible_web/controllers/api/external_controller.ex index 55c82a33dece755a13c89bcb881836aa66bbe827..717575caef8c6939c73ac342d016017129a34c46 100644 --- a/lib/plausible_web/controllers/api/external_controller.ex +++ b/lib/plausible_web/controllers/api/external_controller.ex @@ -48,12 +48,11 @@ defmodule PlausibleWeb.Api.ExternalController do event_attrs = %{ name: params["name"], + timestamp: NaiveDateTime.utc_now(), hostname: strip_www(uri && uri.host), domain: strip_www(params["domain"]) || strip_www(uri && uri.host), - pathname: uri && uri.path, - new_visitor: params["new_visitor"], + pathname: uri && escape_quote(uri.path), country_code: country_code, - user_id: params["uid"], fingerprint: calculate_fingerprint(conn, params), operating_system: ua && os_name(ua), browser: ua && browser_name(ua), @@ -64,8 +63,9 @@ defmodule PlausibleWeb.Api.ExternalController do screen_size: calculate_screen_size(params["screen_width"]) } - Plausible.Event.changeset(%Plausible.Event{}, event_attrs) - |> Plausible.Repo.insert + changeset = Plausible.Event.changeset(%Plausible.Event{}, event_attrs) + if changeset.valid? && changeset.data.domain in ["plausible.io", "localtest.me"], do: Plausible.Event.WriteBuffer.insert(changeset.data) + Plausible.Repo.insert(changeset) end end @@ -143,6 +143,8 @@ defmodule PlausibleWeb.Api.ExternalController do end end + defp escape_quote(s), do: String.replace(s, "'", "''") + defp clean_uri(uri) do uri = URI.parse(String.trim(uri)) if uri.scheme in ["http", "https"] do diff --git a/lib/plausible_web/controllers/api/stats_controller.ex b/lib/plausible_web/controllers/api/stats_controller.ex index 6524b3ae45d55e09adabf2773ddd261608638982..65a8277ebcbd5669879b051e1450d59ae6b25dde 100644 --- a/lib/plausible_web/controllers/api/stats_controller.ex +++ b/lib/plausible_web/controllers/api/stats_controller.ex @@ -69,8 +69,9 @@ defmodule PlausibleWeb.Api.StatsController do site = conn.assigns[:site] query = Stats.Query.from(site.timezone, params) include = if params["include"], do: String.split(params["include"], ","), else: [] + limit = if params["limit"], do: String.to_integer(params["limit"]) - json(conn, Stats.top_referrers(site, query, params["limit"] || 9, include)) + json(conn, Stats.top_referrers(site, query, limit || 9, include)) end def referrers_for_goal(conn, params) do @@ -128,8 +129,9 @@ defmodule PlausibleWeb.Api.StatsController do site = conn.assigns[:site] query = Stats.Query.from(site.timezone, params) include = if params["include"], do: String.split(params["include"], ","), else: [] + limit = if params["limit"], do: String.to_integer(params["limit"]) - json(conn, Stats.top_pages(site, query, params["limit"] || 9, include)) + json(conn, Stats.top_pages(site, query, limit || 9, include)) end def countries(conn, params) do diff --git a/mix.exs b/mix.exs index 2084e193ccf1ed54301f7db618b916f3277d58fc..1a3c56a7b527f08f9a5892a114736c3e3f7306c1 100644 --- a/mix.exs +++ b/mix.exs @@ -62,7 +62,8 @@ defmodule Plausible.MixProject do {:php_serializer, "~> 0.9.0"}, {:csv, "~> 2.3"}, {:oauther, "~> 1.1"}, - {:nanoid, "~> 2.0.2"} + {:nanoid, "~> 2.0.2"}, + {:clickhousex, [git: "https://github.com/atlas-forks/clickhousex.git"]} ] end diff --git a/mix.lock b/mix.lock index d031d7d05525f2ef197f95a0b816a31dc24a278d..93fba55c27cd74d09825f85ce360efc8c00f1c93 100644 --- a/mix.lock +++ b/mix.lock @@ -5,6 +5,8 @@ "bcrypt_elixir": {:hex, :bcrypt_elixir, "2.2.0", "3df902b81ce7fa8867a2ae30d20a1da6877a2c056bfb116fd0bc8a5f0190cea4", [:make, :mix], [{:comeonin, "~> 5.3", [hex: :comeonin, repo: "hexpm", optional: false]}, {:elixir_make, "~> 0.6", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "762be3fcb779f08207531bc6612cca480a338e4b4357abb49f5ce00240a77d1e"}, "browser": {:hex, :browser, "0.4.4", "bd6436961a6b2299c6cb38d0e49761c1161d869cd0db46369cef2bf6b77c3665", [:mix], [{:plug, "~> 1.2", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "d476ca309d4a4b19742b870380390aabbcb323c1f6f8745e2da2dfd079b4f8d7"}, "certifi": {:hex, :certifi, "2.5.1", "867ce347f7c7d78563450a18a6a28a8090331e77fa02380b4a21962a65d36ee5", [:rebar3], [{:parse_trans, "~>3.3", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm", "805abd97539caf89ec6d4732c91e62ba9da0cda51ac462380bbd28ee697a8c42"}, + "clickhouse_ecto": {:git, "git@github.com:appodeal/clickhouse_ecto.git", "c4fa1c3d2b73e4be698e205ad6e9ace22ac23f7d", []}, + "clickhousex": {:git, "https://github.com/atlas-forks/clickhousex.git", "e010c4eaa6cb6b659e44790a3bea2ec7703ceb31", []}, "combine": {:hex, :combine, "0.10.0", "eff8224eeb56498a2af13011d142c5e7997a80c8f5b97c499f84c841032e429f", [:mix], [], "hexpm", "1b1dbc1790073076580d0d1d64e42eae2366583e7aecd455d1215b0d16f2451b"}, "comeonin": {:hex, :comeonin, "5.3.1", "7fe612b739c78c9c1a75186ef2d322ce4d25032d119823269d0aa1e2f1e20025", [:mix], [], "hexpm", "d6222483060c17f0977fad1b7401ef0c5863c985a64352755f366aee3799c245"}, "connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], [], "hexpm", "4a0850c9be22a43af9920a71ab17c051f5f7d45c209e40269a1938832510e4d9"}, @@ -12,7 +14,7 @@ "cowboy": {:hex, :cowboy, "2.7.0", "91ed100138a764355f43316b1d23d7ff6bdb0de4ea618cb5d8677c93a7a2f115", [:rebar3], [{:cowlib, "~> 2.8.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "~> 1.7.1", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "04fd8c6a39edc6aaa9c26123009200fc61f92a3a94f3178c527b70b767c6e605"}, "cowlib": {:hex, :cowlib, "2.8.0", "fd0ff1787db84ac415b8211573e9a30a3ebe71b5cbff7f720089972b2319c8a4", [:rebar3], [], "hexpm", "79f954a7021b302186a950a32869dbc185523d99d3e44ce430cd1f3289f41ed4"}, "csv": {:hex, :csv, "2.3.1", "9ce11eff5a74a07baf3787b2b19dd798724d29a9c3a492a41df39f6af686da0e", [:mix], [{:parallel_stream, "~> 1.0.4", [hex: :parallel_stream, repo: "hexpm", optional: false]}], "hexpm", "86626e1c89a4ad9a96d0d9c638f9e88c2346b89b4ba1611988594ebe72b5d5ee"}, - "db_connection": {:hex, :db_connection, "2.2.1", "caee17725495f5129cb7faebde001dc4406796f12a62b8949f4ac69315080566", [:mix], [{:connection, "~> 1.0.2", [hex: :connection, repo: "hexpm", optional: false]}], "hexpm", "2b02ece62d9f983fcd40954e443b7d9e6589664380e5546b2b9b523cd0fb59e1"}, + "db_connection": {:hex, :db_connection, "2.2.2", "3bbca41b199e1598245b716248964926303b5d4609ff065125ce98bcd368939e", [:mix], [{:connection, "~> 1.0.2", [hex: :connection, repo: "hexpm", optional: false]}], "hexpm", "642af240d8a8affb93b4ba5a6fcd2bbcbdc327e1a524b825d383711536f8070c"}, "decimal": {:hex, :decimal, "1.8.1", "a4ef3f5f3428bdbc0d35374029ffcf4ede8533536fa79896dd450168d9acdf3c", [:mix], [], "hexpm", "3cb154b00225ac687f6cbd4acc4b7960027c757a5152b369923ead9ddbca7aec"}, "double": {:hex, :double, "0.7.0", "a7ee4c3488a0acc6d2ad9b69b6c7d3ddf3da2b54488d0f7c2d6ceb3a995887ca", [:mix], [], "hexpm", "f0c387a2266b4452da7bab03598feec11aef8b2acab061ea947dae81bb257329"}, "ecto": {:hex, :ecto, "3.4.2", "6890af71025769bd27ef62b1ed1925cfe23f7f0460bcb3041da4b705215ff23e", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "b3959b8a83e086202a4bd86b4b5e6e71f9f1840813de14a57d502d3fc2ef7132"}, @@ -33,6 +35,7 @@ "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, "mime": {:hex, :mime, "1.3.1", "30ce04ab3175b6ad0bdce0035cba77bba68b813d523d1aac73d9781b4d193cf8", [:mix], [], "hexpm", "6cbe761d6a0ca5a31a0931bf4c63204bceb64538e664a8ecf784a9a6f3b875f1"}, "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, + "myxql": {:hex, :myxql, "0.4.0", "d95582db9e4b4707eb3a6a7002b8869a5240247931775f82d811ad450ca06503", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.6", [hex: :decimal, repo: "hexpm", optional: false]}, {:geo, "~> 3.3", [hex: :geo, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "27a7ebaf7822cf7c89ea796371e70b942c8ec5e2c4eedcb8cd8e043f014014ad"}, "nanoid": {:hex, :nanoid, "2.0.2", "f3f7b4bf103ab6667f22beb00b6315825ee3f30100dd2c93d534e5c02164e857", [:mix], [], "hexpm", "3095cb1fac7bbc78843a8ccd99f1af375d0da1d3ebaa8552e846b73438c0c44f"}, "oauther": {:hex, :oauther, "1.1.1", "7d8b16167bb587ecbcddd3f8792beb9ec3e7b65c1f8ebd86b8dd25318d535752", [:mix], [], "hexpm", "9374f4302045321874cccdc57eb975893643bd69c3b22bf1312dab5f06e5788e"}, "parallel_stream": {:hex, :parallel_stream, "1.0.6", "b967be2b23f0f6787fab7ed681b4c45a215a81481fb62b01a5b750fa8f30f76c", [:mix], [], "hexpm", "639b2e8749e11b87b9eb42f2ad325d161c170b39b288ac8d04c4f31f8f0823eb"}, diff --git a/test/support/clickhouse_setup.ex b/test/support/clickhouse_setup.ex new file mode 100644 index 0000000000000000000000000000000000000000..48a2a23b0052d476e407cd30bbcce22aec5f08d2 --- /dev/null +++ b/test/support/clickhouse_setup.ex @@ -0,0 +1,57 @@ +defmodule Plausible.Test.ClickhouseSetup do + def run() do + create_events() + create_sessions() + end + + def create_events() do + ddl = """ + CREATE TABLE IF NOT EXISTS events ( + timestamp DateTime, + name String, + domain String, + user_id FixedString(64), + hostname String, + pathname String, + referrer Nullable(String), + referrer_source Nullable(String), + initial_referrer Nullable(String), + initial_referrer_source Nullable(String), + country_code Nullable(FixedString(2)), + screen_size Nullable(String), + operating_system Nullable(String), + browser Nullable(String) + ) ENGINE = MergeTree() + PARTITION BY toYYYYMM(timestamp) + ORDER BY (name, domain, timestamp, user_id) + SETTINGS index_granularity = 8192 + """ + + Clickhousex.query(:clickhouse, ddl, [],log: {Plausible.Clickhouse, :log, []}) + end + + def create_sessions() do + ddl = """ + CREATE TABLE IF NOT EXISTS sessions ( + domain String, + user_id FixedString(64), + hostname String, + start DateTime, + is_bounce UInt8, + entry_page Nullable(String), + exit_page Nullable(String), + referrer Nullable(String), + referrer_source Nullable(String), + country_code Nullable(FixedString(2)), + screen_size Nullable(String), + operating_system Nullable(String), + browser Nullable(String) + ) ENGINE = MergeTree() + PARTITION BY toYYYYMM(start) + ORDER BY (domain, start, user_id) + SETTINGS index_granularity = 8192 + """ + + Clickhousex.query(:clickhouse, ddl, [],log: {Plausible.Clickhouse, :log, []}) + end +end diff --git a/test/test_helper.exs b/test/test_helper.exs index 9a7849359af8e55bfcb6bdc34dd1c4e7249f9066..884d9340703d4803a86893378b6222bbf048b908 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1,4 +1,5 @@ {:ok, _} = Application.ensure_all_started(:ex_machina) +Plausible.Test.ClickhouseSetup.run() ExUnit.start() Application.ensure_all_started(:double) Ecto.Adapters.SQL.Sandbox.mode(Plausible.Repo, :manual)