-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
0.2.0 Introduce client Clone task #12
Changes from all commits
1e69564
52dcbd6
ee7ac96
f61539e
d23154e
26d1906
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,7 +16,7 @@ jobs: | |
strategy: | ||
matrix: | ||
elixir: [1.17.2] | ||
otp: [26.2.5.1, 27.0] | ||
otp: [26.2.5.1, 27.0.1] | ||
steps: | ||
- name: Cancel Previous Runs | ||
uses: styfle/[email protected] | ||
|
@@ -61,7 +61,7 @@ jobs: | |
strategy: | ||
matrix: | ||
elixir: [1.17.2] | ||
otp: [26.2.5.1, 27.0] | ||
otp: [26.2.5.1, 27.0.1] | ||
steps: | ||
- name: Cancel Previous Runs | ||
uses: styfle/[email protected] | ||
|
@@ -126,7 +126,7 @@ jobs: | |
fail-fast: false | ||
matrix: | ||
elixir: [1.17.2] | ||
otp: [26.2.5.1, 27.0] | ||
otp: [26.2.5.1, 27.0.1] | ||
steps: | ||
- name: Cancel Previous Runs | ||
uses: styfle/[email protected] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,2 @@ | ||
elixir 1.17.2 | ||
erlang 27.0 | ||
erlang 27.0.1 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
defmodule Mix.Tasks.Umwelt.Clone do | ||
@moduledoc "Clones phase modules and code" | ||
@shortdoc "The code puller" | ||
use Mix.Task | ||
require Logger | ||
|
||
@impl Mix.Task | ||
def run([phase_id]) do | ||
case System.get_env("UMWELT_TOKEN", "no_token") do | ||
"no_token" -> | ||
""" | ||
Token not found in env! | ||
You can get it on umwelt.dev/auth/profile and do | ||
|
||
export UMWELT_TOKEN="token" | ||
|
||
or pass it directly in | ||
|
||
mix clone phase_id "token" | ||
|
||
""" | ||
|> Logger.warning() | ||
|
||
token -> | ||
run([phase_id, token]) | ||
end | ||
end | ||
|
||
@impl Mix.Task | ||
def run([phase_id, token]) do | ||
Umwelt.Client.Application.start(:normal, []) | ||
|
||
Umwelt.Client.Supervisor | ||
|> Process.whereis() | ||
|> Process.monitor() | ||
sovetnik marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
%{phase_id: phase_id, token: token} | ||
|> assign_host() | ||
|> assign_port() | ||
|> Umwelt.Client.pull() | ||
|
||
receive do | ||
{:DOWN, _, :process, _, _} -> | ||
Logger.info("Done!") | ||
|
||
other -> | ||
Logger.warning(inspect(other)) | ||
end | ||
end | ||
|
||
defp assign_host(params) do | ||
host = | ||
case Mix.env() do | ||
:test -> "http://localhost" | ||
_ -> System.get_env("UMWELT_HOST", "https://umwelt.dev") | ||
end | ||
|
||
Map.put(params, :api_host, host) | ||
end | ||
|
||
defp assign_port(params) do | ||
port = | ||
case Mix.env() do | ||
:dev -> | ||
case params.api_host do | ||
"http://localhost" -> 4000 | ||
"https://umwelt.dev" -> 443 | ||
end | ||
|
||
:test -> | ||
Application.get_env(:umwelt, :api_port) | ||
end | ||
|
||
Map.put(params, :port, port) | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
defmodule Umwelt.Client do | ||
@moduledoc "Client for umwelt.dev" | ||
|
||
alias Umwelt.Client.Clone | ||
|
||
require Logger | ||
|
||
def pull(params) do | ||
GenServer.cast(Clone, {:pull, params}) | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
defmodule Umwelt.Client.Agent do | ||
@moduledoc "Keeps pulling metadata" | ||
|
||
use Agent | ||
require Logger | ||
|
||
def start_link(_args) do | ||
Agent.start_link( | ||
fn -> | ||
%{ | ||
modules: %{}, | ||
waiting: [], | ||
fetching: [], | ||
fetched: [], | ||
writing: [], | ||
written: [], | ||
total: 0 | ||
} | ||
end, | ||
name: __MODULE__ | ||
) | ||
end | ||
|
||
def all_waiting, do: Agent.get(__MODULE__, fn state -> state.waiting end) | ||
|
||
def completed?, | ||
do: Agent.get(__MODULE__, fn state -> state.total == Enum.count(state.written) end) | ||
|
||
def state, do: Agent.get(__MODULE__, fn state -> state end) | ||
def total, do: Agent.get(__MODULE__, fn state -> state.total end) | ||
def ready, do: Agent.get(__MODULE__, fn state -> Enum.count(state.written) end) | ||
|
||
def add_modules(modules) do | ||
Agent.update(__MODULE__, fn state -> | ||
%{ | ||
state | ||
| modules: modules, | ||
waiting: Map.keys(modules), | ||
fetching: [], | ||
fetched: [], | ||
writing: [], | ||
written: [], | ||
total: map_size(modules) | ||
} | ||
end) | ||
end | ||
|
||
def next_waiting do | ||
Agent.get_and_update(__MODULE__, fn state -> | ||
case state.waiting do | ||
[mod_name | modules] -> | ||
{ | ||
%{id: state.modules[mod_name], name: mod_name}, | ||
state | ||
|> Map.put(:waiting, modules) | ||
|> Map.put(:fetching, [mod_name | state.fetching]) | ||
} | ||
|
||
[] -> | ||
{nil, state} | ||
end | ||
end) | ||
end | ||
|
||
def update_status(mod_name, :fetched) do | ||
Agent.update(__MODULE__, fn state -> | ||
state | ||
|> Map.put(:fetching, List.delete(state.fetching, mod_name)) | ||
|> Map.put(:fetched, [mod_name | state.fetched]) | ||
end) | ||
end | ||
|
||
def update_status(mod_name, :writing) do | ||
Agent.update(__MODULE__, fn state -> | ||
state | ||
|> Map.put(:writing, [mod_name | state.writing]) | ||
end) | ||
end | ||
|
||
def update_status(mod_name, :written) do | ||
Agent.update(__MODULE__, fn state -> | ||
state | ||
|> Map.put(:writing, List.delete(state.writing, mod_name)) | ||
|> Map.put(:written, [mod_name | state.written]) | ||
end) | ||
|
||
render_progress() | ||
end | ||
|
||
def render_progress do | ||
ProgressBar.render(ready(), total(), suffix: :count) | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
defmodule Umwelt.Client.Application do | ||
@moduledoc "Client app & Supervisor" | ||
|
||
use Application | ||
|
||
def start(_type, _args) do | ||
children = [ | ||
{Umwelt.Client.Agent, []}, | ||
{Task.Supervisor, name: Umwelt.Client.FetcherSupervisor}, | ||
{Task.Supervisor, name: Umwelt.Client.WriterSupervisor}, | ||
{Umwelt.Client.Clone, []} | ||
] | ||
|
||
opts = [strategy: :one_for_one, name: Umwelt.Client.Supervisor] | ||
Supervisor.start_link(children, opts) | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
defmodule Umwelt.Client.Clone do | ||
@moduledoc "Clone main process" | ||
|
||
use GenServer, restart: :transient, shutdown: 10_000 | ||
require Logger | ||
|
||
alias Umwelt.Client | ||
|
||
def start_link(_), | ||
do: | ||
GenServer.start_link( | ||
__MODULE__, | ||
%{phase_id: nil, port: nil}, | ||
name: __MODULE__ | ||
) | ||
|
||
def init(state), do: {:ok, state} | ||
|
||
def handle_cast({:pull, params}, _state) do | ||
{:noreply, params, {:continue, :fetch_modules}} | ||
end | ||
|
||
def handle_continue(:fetch_modules, state) do | ||
case Client.Request.fetch_modules(state) do | ||
{:ok, modules} -> | ||
Logger.info("Fetching modules: #{inspect(Map.keys(modules))}") | ||
modules |> Client.Agent.add_modules() | ||
|
||
{:error, reason} -> | ||
Logger.error("Failed to fetch modules: #{inspect(reason)}. Stopping...") | ||
Supervisor.stop(Client.Supervisor) | ||
end | ||
|
||
{:noreply, state, {:continue, :start_pulling}} | ||
end | ||
|
||
def handle_continue(:start_pulling, state) do | ||
Client.Agent.all_waiting() | ||
|> Enum.each(fn _ -> spawn_fetcher(state) end) | ||
|
||
{:noreply, state} | ||
end | ||
|
||
def handle_info({:fetched, %{name: name, code: code}}, state) do | ||
Client.Agent.update_status(name, :fetched) | ||
spawn_writer(%{name: name, code: code}) | ||
{:noreply, state} | ||
end | ||
|
||
def handle_info({:fetch_failed, module}, state) do | ||
Logger.warning("Respawning failed fetcher for module #{module.name}") | ||
Client.Fetcher.start_link(module) | ||
{:noreply, state} | ||
end | ||
|
||
def handle_info({:written, mod_name}, state) do | ||
Client.Agent.update_status(mod_name, :written) | ||
send(self(), :maybe_stop) | ||
{:noreply, state} | ||
end | ||
|
||
def handle_info(:maybe_stop, state) do | ||
if Client.Agent.completed?() do | ||
Logger.debug("All modules processed. Stopping application.") | ||
:timer.sleep(99) | ||
Supervisor.stop(Client.Supervisor) | ||
end | ||
|
||
{:noreply, state} | ||
end | ||
|
||
defp spawn_fetcher(state) do | ||
case Client.Agent.next_waiting() do | ||
nil -> | ||
Logger.debug("No more modules to fetch") | ||
:ok | ||
|
||
module -> | ||
Logger.debug("Spawning fetcher for module #{inspect(module.name)}") | ||
Client.Fetcher.start_link(Map.merge(module, state)) | ||
end | ||
end | ||
|
||
defp spawn_writer(%{name: name} = module) do | ||
Logger.debug("Spawning writer for module #{inspect(name)}") | ||
Client.Agent.update_status(name, :writing) | ||
Client.Writer.start_link(module) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would better organize writers under their own There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe |
||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mix.shell.info/1
makes it fancier, see this for an inspiration.