Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support of per-topic configuration #131

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use Mix.Config
import Config

config :kaffe,
kafka_mod: :brod,
Expand Down
2 changes: 1 addition & 1 deletion config/dev.exs
Original file line number Diff line number Diff line change
@@ -1 +1 @@
use Mix.Config
import Config
42 changes: 22 additions & 20 deletions config/test.exs
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
use Mix.Config
import Config

config :kaffe,
kafka_mod: TestBrod,
group_subscriber_mod: TestBrodGroupSubscriber,
test_partition_count: 32,
consumer: [
endpoints: [kafka: 9092],
topics: ["kaffe-test"],
consumer_group: "kaffe-test-group",
message_handler: SilentMessage,
async_message_ack: false,
offset_commit_interval_seconds: 10,
start_with_earliest_message: true,
rebalance_delay_ms: 100,
max_bytes: 10_000,
subscriber_retries: 1,
subscriber_retry_delay_ms: 5,
client_down_retry_expire: 15_000,
sasl: %{
mechanism: :plain,
login: System.get_env("KAFFE_PRODUCER_USER"),
password: System.get_env("KAFFE_PRODUCER_PASSWORD")
}
],
consumers: %{
"subscriber_name" => [
endpoints: [kafka: 9092],
topics: ["kaffe-test"],
consumer_group: "kaffe-test-group",
message_handler: SilentMessage,
async_message_ack: false,
offset_commit_interval_seconds: 10,
start_with_earliest_message: true,
rebalance_delay_ms: 100,
max_bytes: 10_000,
subscriber_retries: 1,
subscriber_retry_delay_ms: 5,
client_down_retry_expire: 15_000,
sasl: %{
mechanism: :plain,
login: System.get_env("KAFFE_PRODUCER_USER"),
password: System.get_env("KAFFE_PRODUCER_PASSWORD")
}
]
},
producer: [
endpoints: [kafka: 9092],
topics: ["kaffe-test"],
Expand Down
158 changes: 92 additions & 66 deletions lib/kaffe/config/consumer.ex
Original file line number Diff line number Diff line change
@@ -1,138 +1,164 @@
defmodule Kaffe.Config.Consumer do
import Kaffe.Config, only: [heroku_kafka_endpoints: 0, parse_endpoints: 1]
require Logger

def configuration do
def configuration(idx) do
%{
endpoints: endpoints(),
subscriber_name: subscriber_name(),
consumer_group: consumer_group(),
topics: topics(),
group_config: consumer_group_config(),
consumer_config: client_consumer_config(),
message_handler: message_handler(),
async_message_ack: async_message_ack(),
rebalance_delay_ms: rebalance_delay_ms(),
max_bytes: max_bytes(),
min_bytes: min_bytes(),
max_wait_time: max_wait_time(),
subscriber_retries: subscriber_retries(),
subscriber_retry_delay_ms: subscriber_retry_delay_ms(),
offset_reset_policy: offset_reset_policy(),
worker_allocation_strategy: worker_allocation_strategy(),
client_down_retry_expire: client_down_retry_expire()
endpoints: endpoints(idx),
subscriber_name: subscriber_name(idx),
consumer_group: consumer_group(idx),
topics: topics(idx),
group_config: consumer_group_config(idx),
consumer_config: client_consumer_config(idx),
message_handler: message_handler(idx),
async_message_ack: async_message_ack(idx),
rebalance_delay_ms: rebalance_delay_ms(idx),
max_bytes: max_bytes(idx),
min_bytes: min_bytes(idx),
max_wait_time: max_wait_time(idx),
subscriber_retries: subscriber_retries(idx),
subscriber_retry_delay_ms: subscriber_retry_delay_ms(idx),
offset_reset_policy: offset_reset_policy(idx),
worker_allocation_strategy: worker_allocation_strategy(idx),
client_down_retry_expire: client_down_retry_expire(idx)
}
end

def consumer_group, do: config_get!(:consumer_group)
def consumer_group(idx), do: config_get!(idx, :consumer_group)

def subscriber_name, do: config_get(:subscriber_name, consumer_group()) |> String.to_atom()
def subscriber_name(idx), do: config_get(idx, :subscriber_name, consumer_group(idx)) |> String.to_atom()

def topics, do: config_get!(:topics)
def topics(idx), do: config_get!(idx, :topics)

def message_handler, do: config_get!(:message_handler)
def message_handler(idx), do: config_get!(idx, :message_handler)

def async_message_ack, do: config_get(:async_message_ack, false)
def async_message_ack(idx), do: config_get(idx, :async_message_ack, false)

def endpoints do
if heroku_kafka?() do
def endpoints(idx) do
if heroku_kafka?(idx) do
heroku_kafka_endpoints()
else
parse_endpoints(config_get!(:endpoints))
parse_endpoints(config_get!(idx, :endpoints))
end
end

def consumer_group_config do
def consumer_group_config(idx) do
[
offset_commit_policy: :commit_to_kafka_v2,
offset_commit_interval_seconds: config_get(:offset_commit_interval_seconds, 5)
offset_commit_interval_seconds: config_get(idx, :offset_commit_interval_seconds, 5)
]
end

def rebalance_delay_ms do
config_get(:rebalance_delay_ms, 10_000)
def rebalance_delay_ms(idx) do
config_get(idx, :rebalance_delay_ms, 10_000)
end

def max_bytes do
config_get(:max_bytes, 1_000_000)
def max_bytes(idx) do
config_get(idx, :max_bytes, 1_000_000)
end

def min_bytes do
config_get(:min_bytes, 0)
def min_bytes(idx) do
config_get(idx, :min_bytes, 0)
end

def max_wait_time do
config_get(:max_wait_time, 10_000)
def max_wait_time(idx) do
config_get(idx, :max_wait_time, 10_000)
end

def subscriber_retries do
config_get(:subscriber_retries, 5)
def subscriber_retries(idx) do
config_get(idx, :subscriber_retries, 5)
end

def subscriber_retry_delay_ms do
config_get(:subscriber_retry_delay_ms, 5_000)
def subscriber_retry_delay_ms(idx) do
config_get(idx, :subscriber_retry_delay_ms, 5_000)
end

def client_consumer_config do
default_client_consumer_config() ++ maybe_heroku_kafka_ssl() ++ sasl_options() ++ ssl_options()
def client_consumer_config(idx) do
default_client_consumer_config(idx) ++ maybe_heroku_kafka_ssl(idx) ++ sasl_options(idx) ++ ssl_options(idx)
end

def sasl_options do
:sasl
|> config_get(%{})
def sasl_options(idx) do
idx
|> config_get(:sasl, %{})
|> Kaffe.Config.sasl_config()
end

def ssl_options do
:ssl
|> config_get(false)
def ssl_options(idx) do
idx
|> config_get(:ssl, false)
|> Kaffe.Config.ssl_config()
end

def default_client_consumer_config do
def default_client_consumer_config(idx) do
[
auto_start_producers: false,
allow_topic_auto_creation: false,
begin_offset: begin_offset()
begin_offset: begin_offset(idx)
]
end

def begin_offset do
case config_get(:start_with_earliest_message, false) do
def begin_offset(idx) do
case config_get(idx, :start_with_earliest_message, false) do
true -> :earliest
false -> -1
end
end

def offset_reset_policy do
config_get(:offset_reset_policy, :reset_by_subscriber)
def offset_reset_policy(idx) do
config_get(idx, :offset_reset_policy, :reset_by_subscriber)
end

def worker_allocation_strategy do
config_get(:worker_allocation_strategy, :worker_per_partition)
def worker_allocation_strategy(idx) do
config_get(idx, :worker_allocation_strategy, :worker_per_partition)
end

def client_down_retry_expire do
config_get(:client_down_retry_expire, 30_000)
def client_down_retry_expire(idx) do
config_get(idx, :client_down_retry_expire, 30_000)
end

def maybe_heroku_kafka_ssl do
case heroku_kafka?() do
def maybe_heroku_kafka_ssl(idx) do
case heroku_kafka?(idx) do
true -> Kaffe.Config.ssl_config()
false -> []
end
end

def heroku_kafka? do
config_get(:heroku_kafka_env, false)
def heroku_kafka?(idx) do
config_get(idx, :heroku_kafka_env, false)
end

def config_get!(key) do
Application.get_env(:kaffe, :consumer)
def config_get!(idx, :subscriber_name), do: idx

def config_get!(idx, key) do
Application.get_env(:kaffe, :consumers)
|> Map.get(idx)
|> Keyword.fetch!(key)
end

def config_get(key, default) do
Application.get_env(:kaffe, :consumer)
def config_get(idx, :subscriber_name, _default), do: idx

def config_get(idx, key, default) do
Application.get_env(:kaffe, :consumers)
|> Map.get(idx)
|> Keyword.get(key, default)
end

def validate_configuration!() do
if Application.get_env(:kaffe, :consumers) == nil do
old_config = Application.get_env(:kaffe, :consumer) || []
subscriber_name = old_config |> Keyword.get(:subscriber_name, "subscriber_name")

raise("""
UPDATE CONSUMERS CONFIG:

Set :kaffe, :consumers to a map with subscriber names as keys and config as values.
For example:

config :kaffe,
consumers: %{
#{inspect(subscriber_name)} => #{inspect(old_config)}
}
""")
end
end
end
8 changes: 4 additions & 4 deletions lib/kaffe/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ defmodule Kaffe.Consumer do

@behaviour :brod_group_subscriber

@kafka Application.get_env(:kaffe, :kafka_mod, :brod)
@group_subscriber Application.get_env(:kaffe, :group_subscriber_mod, :brod_group_subscriber)
@kafka Application.compile_env(:kaffe, :kafka_mod, :brod)
@group_subscriber Application.compile_env(:kaffe, :group_subscriber_mod, :brod_group_subscriber)

require Record
import Record, only: [defrecord: 2, extract: 2]
Expand Down Expand Up @@ -61,8 +61,8 @@ defmodule Kaffe.Consumer do
acknowledgement you will be able to process messages faster but will need to
take on the burden of ensuring no messages are lost.
"""
def start_link do
config = Kaffe.Config.Consumer.configuration()
def start_link(config_idx) do
config = Kaffe.Config.Consumer.configuration(config_idx)

@kafka.start_link_group_subscriber(
config.subscriber_name,
Expand Down
Loading