AshPostgres 사용해 보기 (2)

지난 글에 이어서 AshPostgres 튜토리얼을 따라가며 사용법을 배워 본다. 이번 글 에서는 집계(Aggregates)와 계산(Calculations)에 대해 알아 볼 것이다.

집계(Aggregates)

집계(Aggregates)는 관계로 연결된 데이터들을 모아 요약 정보를 제공하는 도구이다. 관계로 연결된 데이터들의 합계, 수, 최대값, 최소값, 평균값 등 통계성 값 들을 얻는데 사용한다. 예제에선 Representative 와 Ticket 이 일대다 관계로 연결되었기 때문에 담당자가 몇개의 티켓을 가지고 있는지 진행중인 티켓은 몇 개 인지 같은 값을 얻는데 사용할 수 있다.

다음 코드를 담당자 리소스 선언에 추가한다.

# in lib/helpdesk/support/representative.ex

  aggregates do
    # 첫번째 인자는 집계의 이름, 두 번째 인자는 관계의 이름
    count :total_tickets, :tickets

    count :open_tickets, :tickets do
      # 표현식으로 필터링 할 수 도 있다
      filter expr(status == :open)
    end

    count :closed_tickets, :tickets do
      filter expr(status == :closed)
    end
  end
# in iex

iex> require Ash.Query

iex> Helpdesk.Support.Representative
|> Ash.Query.filter(closed_tickets < 4)  # closed_tickets 집계가 사용 됨
|> Ash.Query.sort(closed_tickets: :desc) # closed_tickets 집계가 사용 됨
|> Ash.read!()

18:44:34.441 [debug] QUERY OK source="representatives" db=15.7ms queue=0.1ms idle=1406.0ms
SELECT r0."id", r0."name" FROM "representatives" AS r0 LEFT OUTER JOIN LATERAL (SELECT st0."representative_id" AS "representative_id", coalesce(count(*), $1::bigint)::bigint AS "closed_tickets" FROM "public"."tickets" AS st0 WHERE (r0."id" = st0."representative_id") AND (st0."status"::varchar = $2::varchar) GROUP BY st0."representative_id") AS s1 ON TRUE WHERE (coalesce(s1."closed_tickets", $3::bigint)::bigint < $4::bigint) ORDER BY coalesce(s1."closed_tickets", $5::bigint)::bigint DESC [0, :closed, 0, 4, 0]

# 저장된 데이터 상황에 따라 결과는 달라질 수 있음
[
  %Helpdesk.Support.Representative{
    id: "f773d41f-24f8-4d38-a5ef-a98a187de034",
    name: "Joe Armstrong",
    total_tickets: #Ash.NotLoaded<:aggregate, field: :total_tickets>,
    open_tickets: #Ash.NotLoaded<:aggregate, field: :open_tickets>,
    closed_tickets: #Ash.NotLoaded<:aggregate, field: :closed_tickets>,
    tickets: #Ash.NotLoaded<:relationship, field: :tickets>,
    __meta__: #Ecto.Schema.Metadata<:loaded, "representatives">
  },
  %Helpdesk.Support.Representative{
    id: "a4467ac7-6f4e-49b1-bdea-9dc98acb5ac9",
    name: "Test User 2",
    total_tickets: #Ash.NotLoaded<:aggregate, field: :total_tickets>,
    open_tickets: #Ash.NotLoaded<:aggregate, field: :open_tickets>,
    closed_tickets: #Ash.NotLoaded<:aggregate, field: :closed_tickets>,
    tickets: #Ash.NotLoaded<:relationship, field: :tickets>,
    __meta__: #Ecto.Schema.Metadata<:loaded, "representatives">
  }
]

리소스에 total_tickets, open_tickets, closed_tickets 값이 추가된 것을 확인할 수 있다. 기본적으로 명시적 로드를 해야만 쿼리에도 반영되고 값도 획득된다. closed_tickets 값은 필터링에서 사용되었기 때문에 쿼리에 집계가 포함되었으나 해당 값 자체는 로드하지 않았기 때문에 NotLoaded 이다. 명시적으로 값을 로드하는 코드는 다음과 같다.

iex> Helpdesk.Support.Representative 
   |> Ash.Query.filter(closed_tickets < 4) 
   |> Ash.Query.sort(closed_tickets: :desc) 
   |> Ash.Query.load([:closed_tickets, :open_tickets, :total_tickets]) # 명시
   |> Ash.read!()

계산(Calculations)

계산은 집계와 비슷하지만 개별 레코드에 대해 작동한다는 것이 다르다. 그러나 계산은 집계된 값을 참조할 수 있다. 예를들어 할당된 티켓의 상태 비율을 계산하는 코드는 다음과 같이 만들 수 있다.

# in lib/helpdesk/support/representative.ex

  calculations do
    # open_tickets, total_tickets 는 aggregates 값
    calculate :percent_open, :float, expr(open_tickets / total_tickets)
  end

iex> Helpdesk.Support.Representative
|> Ash.Query.filter(percent_open > 0.25)
|> Ash.Query.sort(:percent_open)
|> Ash.Query.load(:percent_open)
|> Ash.read!()


19:00:02.609 [debug] QUERY OK source="representatives" db=4.2ms queue=18.6ms idle=1420.5ms
SELECT r0."id", r0."name", (coalesce(s1."open_tickets", $1::bigint)::bigint::decimal / coalesce(s1."total_tickets", $2::bigint)::bigint::decimal)::float FROM "representatives" AS r0 LEFT OUTER JOIN LATERAL (SELECT st0."representative_id" AS "representative_id", coalesce(count(*) FILTER (WHERE st0."status"::varchar = $3::varchar), $4::bigint)::bigint AS "open_tickets", coalesce(count(*) FILTER (WHERE $5), $6::bigint)::bigint AS "total_tickets" FROM "public"."tickets" AS st0 WHERE (r0."id" = st0."representative_id") GROUP BY st0."representative_id") AS s1 ON TRUE WHERE ((coalesce(s1."open_tickets", $7::bigint)::bigint::decimal / coalesce(s1."total_tickets", $8::bigint)::bigint::decimal)::float > $9::float) ORDER BY (coalesce(s1."open_tickets", $10::bigint)::bigint::decimal / coalesce(s1."total_tickets", $11::bigint)::bigint::decimal)::float [0, 0, :open, 0, true, 0, 0, 0, 0.25, 0, 0]
[
  %Helpdesk.Support.Representative{
    id: "f773d41f-24f8-4d38-a5ef-a98a187de034",
    name: "Joe Armstrong",
    total_tickets: #Ash.NotLoaded<:aggregate, field: :total_tickets>,
    open_tickets: #Ash.NotLoaded<:aggregate, field: :open_tickets>,
    closed_tickets: #Ash.NotLoaded<:aggregate, field: :closed_tickets>,
    percent_open: 0.5,
    tickets: #Ash.NotLoaded<:relationship, field: :tickets>,
    __meta__: #Ecto.Schema.Metadata<:loaded, "representatives">
  },
  %Helpdesk.Support.Representative{
    id: "a4467ac7-6f4e-49b1-bdea-9dc98acb5ac9",
    name: "Test User 2",
    total_tickets: #Ash.NotLoaded<:aggregate, field: :total_tickets>,
    open_tickets: #Ash.NotLoaded<:aggregate, field: :open_tickets>,
    closed_tickets: #Ash.NotLoaded<:aggregate, field: :closed_tickets>,
    percent_open: 0.5,
    tickets: #Ash.NotLoaded<:relationship, field: :tickets>,
    __meta__: #Ecto.Schema.Metadata<:loaded, "representatives">
  }
]

집계과 계산을 잘 활용하면 리소스 별 통계성 값 들을 꽤 깔끔하게 코드로 표현할 수 있다. 예제의 코드들은 Ash 가 만들어 주는 쿼리들 이었지만 원한다면 직접 커스텀 집계 쿼리를 만들 수 도 있다. 적절히 사용하면 리소스의 명료한 표현에 큰 도움이 될 것 같다.

AshPostgres 사용해 보기

Ash는 도메인을 중심으로 리소스를 정의하면 API, 데이터 레이어 등의 구현을 자동으로 생성해주는 Elixir의 선언적 백엔드 프레임워크이다. 리소스를 다루다 보면 리소스의 상태를 데이터베이스에 저장할 필요가 있는데 이 때 AshPostgres 익스텐션을 사용할 수 있다. 이름에서 알 수 있듯 PostgreSQL 데이터베이스에 리소스의 상태를 저장하는 기능을 제공한다. 이번 글 에서는 AshPostgres의 튜토리얼을 따라가며 어떻게 사용하는지 살펴본다.

이 글의 예제 코드는 https://hexdocs.pm/ash/get-started.html 에서 만든 코드부터 시작하기 때문에 코드를 이어서 실행하기 위해선 위의 Ash 튜토리얼을 먼저 진행해 보는 것을 권장한다.

설치 및 설정

리소스를 Postgres 데이터베이스에 저장 하는 익스텐션이니 당연히 먼저 Postgres 데이터베이스 가 설치되어 있어야 한다.

Igniter 를 가지고 AshPostgres 설치(설정) 한다. 공식 문서에는 수동으로 직접 설정하는 방법도 있지만 추가해야 할 부분이 많아 다음 mix 명령어를 통해 쉽게 설정하는 것을 권장한다.

$ mix igniter.install ash_postgres

AshPostgres 관련 기본 설정들이 추가되고 postgres 접속 정보도 config/ 에 추가 된다. 접속 정보는 설치된 postgres 상황에 맞게 변경한다.

# in config/dev.exs

import Config

# Configure your database
config :helpdesk, Helpdesk.Repo,
  username: "postgres",
  password: "postgres",
  hostname: "localhost",
  database: "helpdesk_dev",
  port: 5432,
  show_sensitive_data_on_connection_error: true,
  pool_size: 10

다음은 리소스의 Data Layer 를 AshPostgres 로 변경한다. 다음 mix 명령어를 통해 쉽게 변경할 수도 있고 직접 리소스의 코드를 변경해도 된다.

$ mix ash.patch.extend Helpdesk.Support.Ticket postgres
$ mix ash.patch.extend Helpdesk.Support.Representative postgres

실제 코드는 다음과 같이 바뀐다.

# in lib/helpdesk/support/ticket.ex

  use Ash.Resource,
    domain: Helpdesk.Support,
    data_layer: AshPostgres.DataLayer

  postgres do
    table "tickets"
    repo Helpdesk.Repo
  end

# in lib/helpdesk/support/representative.ex

  use Ash.Resource,
    domain: Helpdesk.Support,
    data_layer: AshPostgres.DataLayer

  postgres do
    table "representatives"
    repo Helpdesk.Repo
  end

postgres 키워드와 함께 table 을 지정할 수 있다.

마이그레이션

다음은 마이그레이션을 생성한다. 마이그레이션은 실제로 데이터베이스의 변경을 가하는 일종의 스크립트이다. 다음 mix 명령어를 실행한다. add_tickets_and_representatives 부분은 이번 마이그레이션이 어떤 일을 수행하는지 적당한 이름을 지어주면 된다.

$ mix ash.codegen add_tickets_and_representatives

리소스에 postgres 등 AshPostgres 관련 선언이 추가되었기 때문에 리소스가 변경된 이력을 바탕으로 마이그레이션 코드를 자동으로 만들어 준다. Ash 를 사용하지 않는다면 이런 데이터베이스 변경 건에 대해 마이그레이션 스크립트를 직접 만들어야 하는 수고가 있는데 Ash Codegen 기능이 이러한 수고를 덜어 준다. priv/repo/migrations 폴더에 마이그레이션 관련 스크립트가 만들어 진다.

자동으로 만들어진 마이그레이션은 다음과 같다. Ecto.Migration 기반이고 up 메소드가 실행 스크립트, down 메소드가 롤백 스크립트이다.

defmodule Helpdesk.Repo.Migrations.AddTicketsAndRepresentatives do
  @moduledoc """
  Updates resources based on their most recent snapshots.

  This file was autogenerated with `mix ash_postgres.generate_migrations`
  """

  use Ecto.Migration

  def up do
    create table(:tickets, primary_key: false) do
      add(:id, :uuid, null: false, default: fragment("gen_random_uuid()"), primary_key: true)
      add(:subject, :text, null: false)
      add(:status, :text, null: false, default: "open")
      add(:representative_id, :uuid)
    end

    create table(:representatives, primary_key: false) do
      add(:id, :uuid, null: false, default: fragment("gen_random_uuid()"), primary_key: true)
    end

    alter table(:tickets) do
      modify(
        :representative_id,
        references(:representatives,
          column: :id,
          name: "tickets_representative_id_fkey",
          type: :uuid,
          prefix: "public"
        )
      )
    end

    alter table(:representatives) do
      add(:name, :text)
    end
  end

  def down do
    alter table(:representatives) do
      remove(:name)
    end

    drop(constraint(:tickets, "tickets_representative_id_fkey"))

    alter table(:tickets) do
      modify(:representative_id, :uuid)
    end

    drop(table(:representatives))

    drop(table(:tickets))
  end
end

Ecto 와는 다르게 snapshot 파일이 만들어 지는데 이는 마이그레이션 시점의 리소스 메타데이터를 JSON 형태로 저장하는 것으로 보인다. 리소스 선언이 변경될 때 resource_snapshot 과 비교해 자동으로 마이그레이션을 생성하기 위한 것으로 보인다.

여기까지 마이그레이션을 만들었다면 만들어진 마이그레이션을 실제로 수행하는 명령어는 다음과 같다.

$ mix ash.setup

Getting extensions in current project...
Running setup for AshPostgres.DataLayer...
The database for Helpdesk.Repo has already been created

17:52:39.685 [info] == Running 20250903084048 Helpdesk.Repo.Migrations.AddTicketsAndRepresentatives.up/0 forward

17:52:39.687 [info] create table tickets

17:52:39.706 [info] create table representatives

17:52:39.708 [info] alter table tickets

17:52:39.714 [info] alter table representatives

17:52:39.720 [info] == Migrated 20250903084048 in 0.0s

실행하면 실제로 SQL 쿼리 로그가 찍히면서 관련 테이블이 만들어 지는 것을 확인할 수 있을 것이다.

이제 몇 가지 테스트 코드를 실행해서 실제로 데이터베이스에 리소스의 상태가 저장되는지 확인해 보자. 아래 코드는 모두 iex 세션에서 실행한다.

iex> require Ash.Query

iex> representative = (
  Helpdesk.Support.Representative
  |> Ash.Changeset.for_create(:create, %{name: "Joe Armstrong"})
  |> Ash.create!()
)

iex> for i <- 0..5 do
  ticket =
    Helpdesk.Support.Ticket
    |> Ash.Changeset.for_create(:open, %{subject: "Issue #{i}"})
    |> Ash.create!()
    |> Ash.Changeset.for_update(:assign, %{representative_id: representative.id})
    |> Ash.update!()

  if rem(i, 2) == 0 do
    ticket
    |> Ash.Changeset.for_update(:close)
    |> Ash.update!()
  end
end


# Show the tickets where the subject contains "2"
iex> Helpdesk.Support.Ticket
|> Ash.Query.filter(contains(subject, "2"))
|> Ash.read!()

# Show the tickets that are closed and their subject does not contain "4"
iex> Helpdesk.Support.Ticket
|> Ash.Query.filter(status == :closed and not(contains(subject, "4")))
|> Ash.read!()

Ash 튜토리얼에서 테스트로 사용했던 코드와 완전히 일치 한다. 리소스를 다루는 코드는 그대로이지만 리소스 선언 상 data_layer 가 AshPosgres 로 변경되었기 때문에 리소스는 ETS(메모리)가 아닌 Postgres 데이터베이스에 저장 되는 것을 확인할 수 있다. 실제로 실행 결과에서도 중간중간 쿼리가 만들어 진 것을 확인할 수 있다.

여기까지 AshPostgres 를 Ash 가 적용된 프로젝트에 처음 추가하는 방법을 살펴 봤다. 다음 글 에서는 관련 데이터 그룹에 대한 요약 정보를 제공하는 집계와 계산 함수에 대해 알아보겠다.