Skip to content

Commit

Permalink
WIP: experimental socket.erl code and hacks
Browse files Browse the repository at this point in the history
  • Loading branch information
RoadRunnr committed Apr 15, 2024
1 parent 5fc23c6 commit 53de024
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 76 deletions.
52 changes: 28 additions & 24 deletions src/eradius_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -162,28 +162,30 @@ restore_upstream_server({ServerIP, Port, Retries, InitialRetries}) ->
proceed_response(Request, {ok, Response, Secret, Authenticator}, _Peer = {_ServerName, {ServerIP, Port}}, TS1, MetricsInfo, Options) ->
update_client_request(Request#radius_request.cmd, MetricsInfo, erlang:monotonic_time() - TS1, Request),
update_client_responses(MetricsInfo),
case eradius_lib:decode_request(Response, Secret, Authenticator) of
{bad_pdu, "Message-Authenticator Attribute is invalid" = Reason} ->
update_client_response(bad_authenticator, MetricsInfo, Request),
?LOG(error, "~s INF: Noreply for request ~p. Could not decode the request, reason: ~s", [printable_peer(ServerIP, Port), Request, Reason]),
noreply;
{bad_pdu, "Authenticator Attribute is invalid" = Reason} ->
update_client_response(bad_authenticator, MetricsInfo, Request),
?LOG(error, "~s INF: Noreply for request ~p. Could not decode the request, reason: ~s", [printable_peer(ServerIP, Port), Request, Reason]),
noreply;
{bad_pdu, "unknown request type" = Reason} ->
update_client_response(unknown_req_type, MetricsInfo, Request),
?LOG(error, "~s INF: Noreply for request ~p. Could not decode the request, reason: ~s", [printable_peer(ServerIP, Port), Request, Reason]),
noreply;
{bad_pdu, Reason} ->
update_client_response(dropped, MetricsInfo, Request),
?LOG(error, "~s INF: Noreply for request ~p. Could not decode the request, reason: ~s", [printable_peer(ServerIP, Port), Request, Reason]),
maybe_failover(Request, noreply, {ServerIP, Port}, Options);
Decoded ->
update_server_status_metric(ServerIP, Port, true, Options),
update_client_response(Decoded#radius_request.cmd, MetricsInfo, Request),
{ok, Response, Authenticator}
end;
{ok, Response, Authenticator};

%% case eradius_lib:decode_request(Response, Secret, Authenticator) of
%% {bad_pdu, "Message-Authenticator Attribute is invalid" = Reason} ->
%% update_client_response(bad_authenticator, MetricsInfo, Request),
%% ?LOG(error, "~s INF: Noreply for request ~p. Could not decode the request, reason: ~s", [printable_peer(ServerIP, Port), Request, Reason]),
%% noreply;
%% {bad_pdu, "Authenticator Attribute is invalid" = Reason} ->
%% update_client_response(bad_authenticator, MetricsInfo, Request),
%% ?LOG(error, "~s INF: Noreply for request ~p. Could not decode the request, reason: ~s", [printable_peer(ServerIP, Port), Request, Reason]),
%% noreply;
%% {bad_pdu, "unknown request type" = Reason} ->
%% update_client_response(unknown_req_type, MetricsInfo, Request),
%% ?LOG(error, "~s INF: Noreply for request ~p. Could not decode the request, reason: ~s", [printable_peer(ServerIP, Port), Request, Reason]),
%% noreply;
%% {bad_pdu, Reason} ->
%% update_client_response(dropped, MetricsInfo, Request),
%% ?LOG(error, "~s INF: Noreply for request ~p. Could not decode the request, reason: ~s", [printable_peer(ServerIP, Port), Request, Reason]),
%% maybe_failover(Request, noreply, {ServerIP, Port}, Options);
%% Decoded ->
%% update_server_status_metric(ServerIP, Port, true, Options),
%% update_client_response(Decoded#radius_request.cmd, MetricsInfo, Request),
%% {ok, Response, Authenticator}
%% end;

proceed_response(Request, Response, {_ServerName, {ServerIP, Port}}, TS1, MetricsInfo, Options) ->
update_client_responses(MetricsInfo),
Expand Down Expand Up @@ -404,7 +406,7 @@ configure(State) ->

%% private
prepare_pools() ->
ets:new(?MODULE, [ordered_set, public, named_table, {keypos, 1}, {write_concurrency,true}]),
ets:new(?MODULE, [ordered_set, public, named_table, {keypos, 1}, {read_concurrency,true}]),
lists:foreach(fun({_PoolName, Servers}) -> prepare_pool(Servers) end, application:get_env(eradius, servers_pool, [])),
lists:foreach(fun(Server) -> store_upstream_servers(Server) end, application:get_env(eradius, servers, [])),
init_server_status_metrics().
Expand Down Expand Up @@ -510,7 +512,9 @@ next_port_and_req_id(Peer, NumberOfPorts, Counters) ->
NextReqId = 0
end,
NewCounters = Counters#{Peer => {NextPortIdx, NextReqId}},
{NextPortIdx, NextReqId, NewCounters}.
R = {NextPortIdx, NextReqId, NewCounters},
%% ?LOG(info, "~s: ~p", [?FUNCTION_NAME, R]),
R.

find_socket_process(PortIdx, Sockets, SocketIP, Sup) ->
case array:get(PortIdx, Sockets) of
Expand Down
134 changes: 97 additions & 37 deletions src/eradius_client_socket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,50 @@
-export([start/3]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).

-record(state, {client, socket, pending, mode, counter}).
-record(state, {client, socket, pending, mode, active_n, counter}).

-include_lib("kernel/include/logger.hrl").

start(SocketIP, Client, PortIdx) ->
gen_server:start_link(?MODULE, [SocketIP, Client, PortIdx], []).

init([SocketIP, Client, PortIdx]) ->
Client ! {PortIdx, self()},
case SocketIP of
undefined ->
ExtraOptions = [];
SocketIP when is_tuple(SocketIP) ->
ExtraOptions = [{ip, SocketIP}]
end,
RecBuf = application:get_env(eradius, recbuf, 8192),
SndBuf = application:get_env(eradius, sndbuf, 131072),
{ok, Socket} = gen_udp:open(0, [{active, once}, binary , {recbuf, RecBuf}, {sndbuf, SndBuf} | ExtraOptions]),
{ok, #state{client = Client, socket = Socket, pending = maps:new(), mode = active, counter = 0}}.
RecBuf = application:get_env(eradius, recbuf, 256*1024),
SndBuf = application:get_env(eradius, sndbuf, 256*1024),

SockAddr =
case SocketIP of
undefined -> any;
_ when is_tuple(SocketIP) -> SocketIP
end,
{ok, Socket} = socket:open(inet, dgram, udp),
ok = socket:bind(Socket, #{family => inet, port => 0, addr => SockAddr}),
ok = socket:setopt(Socket, socket, rcvbuf, RecBuf),
ok = socket:setopt(Socket, socket, sndbuf, SndBuf),

self() ! {'$socket', Socket, select, undefined},

State = #state{client = Client,
socket = Socket,
pending = maps:new(),
mode = active,
active_n = 100,
counter = 0},
{ok, State}.

handle_call(_Request, _From, State) ->
{noreply, State}.

handle_cast(_Msg, State) ->
{noreply, State}.

handle_info({'$socket', Socket, select, Info}, #state{socket = Socket} = State) ->
handle_input(Socket, Info, State);

handle_info({SenderPid, send_request, {IP, Port}, ReqId, EncRequest},
State = #state{socket = Socket, pending = Pending, counter = Counter}) ->
case gen_udp:send(Socket, IP, Port, EncRequest) of
case socket:sendto(Socket, EncRequest, #{family => inet, port => Port, addr => IP}) of
ok ->
ReqKey = {IP, Port, ReqId},
NPending = maps:put(ReqKey, SenderPid, Pending),
Expand All @@ -41,31 +58,6 @@ handle_info({SenderPid, send_request, {IP, Port}, ReqId, EncRequest},
{noreply, State}
end;

handle_info({udp, Socket, FromIP, FromPort, EncRequest},
State = #state{socket = Socket, pending = Pending, mode = Mode, counter = Counter}) ->
case eradius_lib:decode_request_id(EncRequest) of
{ReqId, EncRequest} ->
case maps:find({FromIP, FromPort, ReqId}, Pending) of
error ->
%% discard reply because we didn't expect it
inet:setopts(Socket, [{active, once}]),
{noreply, State};
{ok, WaitingSender} ->
WaitingSender ! {self(), response, ReqId, EncRequest},
inet:setopts(Socket, [{active, once}]),
NPending = maps:remove({FromIP, FromPort, ReqId}, Pending),
NState = State#state{pending = NPending, counter = Counter-1},
case {Mode, Counter-1} of
{inactive, 0} -> {stop, normal, NState};
_ -> {noreply, NState}
end
end;
{bad_pdu, _} ->
%% discard reply because it was malformed
inet:setopts(Socket, [{active, once}]),
{noreply, State}
end;

handle_info(close, State = #state{counter = Counter}) ->
case Counter of
0 -> {stop, normal, State};
Expand All @@ -80,3 +72,71 @@ terminate(_Reason, _State) ->

code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%%%===================================================================
%%% Internal functions
%%%===================================================================

handle_input(Socket, Info, #state{active_n = ActiveN} = State) ->
handle_input(Socket, Info, 0, ActiveN, State).

handle_input(_Socket, _Info, _Cnt, _Max, #state{mode = inactive, counter = 0} = State) ->
{stop, normal, State};
handle_input(Socket, _Info, Cnt, Max, State0)
when Cnt >= Max ->
%% break the loop and restart
self() ! {'$socket', Socket, select, undefined},
{noreply, State0};

handle_input(Socket, Info, Cnt, Max, State0) ->
case socket:recvfrom(Socket, 0, [], nowait) of
{error, _} ->
State = handle_err_input(Socket, State0),
handle_input(Socket, Info, Cnt + 1, Max, State);

{ok, {#{addr := IP, port := Port}, Data}} ->
ArrivalTS = erlang:monotonic_time(),
State = handle_message(ArrivalTS, IP, Port, Data, State0),
handle_input(Socket, Info, Cnt + 1, Max, State);

{select, _SelectInfo} when Cnt == 0 ->
%% there must be something in the error queue
State = handle_err_input(Socket, State0),
handle_input(Socket, Info, Cnt + 1, Max, State);

{select, _SelectInfo} ->
{noreply, State0}
end.

handle_err_input(Socket, State) ->
case socket:recvmsg(Socket, [errqueue], 0) of
{ok, #{addr := #{addr := IP, port := Port}, ctrl := Ctrl}} ->
%% lists:foreach(handle_socket_error(_, IP, Port, State), Ctrl),
ok;
{error, timeout} ->
ok;
{error, ewouldblock} ->
ok;

Other ->
?LOG(error, "got unhandled error input: ~p", [Other])
end,
State.

handle_message(ArrivalTS, FromIP, FromPort, EncRequest,
#state{pending = Pending, mode = Mode, counter = Counter} = State) ->
case eradius_lib:decode_request_id(EncRequest) of
{ReqId, EncRequest} ->
case maps:find({FromIP, FromPort, ReqId}, Pending) of
error ->
%% discard reply because we didn't expect it
State;
{ok, WaitingSender} ->
WaitingSender ! {self(), response, ReqId, EncRequest},
NPending = maps:remove({FromIP, FromPort, ReqId}, Pending),
State#state{pending = NPending, counter = Counter - 1}
end;
{bad_pdu, _} ->
%% discard reply because it was malformed
State
end.
31 changes: 16 additions & 15 deletions src/eradius_counter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -86,26 +86,26 @@ aggregate({Servers, {ResetTS, Nass}}) ->
%% @doc Set Value for the given prometheus boolean metric by the given Name with
%% the given values
set_boolean_metric(Name, Labels, Value) ->
case code:is_loaded(prometheus) of
{file, _} ->
%% case code:is_loaded(prometheus) of
%% {file, _} ->
try
prometheus_boolean:set(Name, Labels, Value)
catch _:_ ->
prometheus_boolean:declare([{name, server_status}, {labels, [server_ip, server_port]},
{help, "Status of an upstream RADIUS Server"}]),
prometheus_boolean:set(Name, Labels, Value)
end;
_ ->
ok
%% end;
%% _ ->
%% ok
end.

%% @doc Update the given histogram metric value
%% NOTE: We use prometheus_histogram collector here instead of eradius_counter ets table because
%% it is much easy to use histograms in this way. As we don't need to manage buckets and do
%% the other histogram things in eradius, but prometheus.erl will do it for us
observe(Name, {{ClientName, ClientIP, _}, {ServerName, ServerIP, ServerPort}} = MetricsInfo, Value, Help) ->
case code:is_loaded(prometheus) of
{file, _} ->
%% case code:is_loaded(prometheus) of
%% {file, _} ->
try
prometheus_histogram:observe(Name, [ServerIP, ServerPort, ServerName, ClientName, ClientIP], Value)
catch _:_ ->
Expand All @@ -114,13 +114,14 @@ observe(Name, {{ClientName, ClientIP, _}, {ServerName, ServerIP, ServerPort}} =
{duration_unit, milliseconds},
{buckets, Buckets}, {help, Help}]),
observe(Name, MetricsInfo, Value, Help)
end;
_ ->
ok
%% end;
%% _ ->
%% ok
end.

observe(Name, #nas_prop{server_ip = ServerIP, server_port = ServerPort, nas_ip = NasIP, nas_id = NasId} = Nas, Value, ServerName, Help) ->
case code:is_loaded(prometheus) of
{file, _} ->
%% case code:is_loaded(prometheus) of
%% {file, _} ->
try
prometheus_histogram:observe(Name, [inet:ntoa(ServerIP), ServerPort, ServerName, inet:ntoa(NasIP), NasId], Value)
catch _:_ ->
Expand All @@ -129,9 +130,9 @@ observe(Name, #nas_prop{server_ip = ServerIP, server_port = ServerPort, nas_ip =
{duration_unit, milliseconds},
{buckets, Buckets}, {help, Help}]),
observe(Name, Nas, Value, ServerName, Help)
end;
_ ->
ok
%% end;
%% _ ->
%% ok
end.

%% helper to be called from the aggregator to fetch this nodes values
Expand Down
4 changes: 4 additions & 0 deletions src/eradius_lib.erl
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
-module(eradius_lib).
-compile([export_all, nowarn_export_all]).

-export([del_attr/2, get_attr/2, encode_request/1, encode_reply/1, decode_request/2, decode_request/3, decode_request_id/1]).
-export([random_authenticator/0, zero_authenticator/0, pad_to/2, set_attr/3, get_attributes/1, set_attributes/2]).
-export([timestamp/0, timestamp/1, printable_peer/2, make_addr_info/1]).
Expand Down Expand Up @@ -133,6 +135,8 @@ encode_eap_message(#radius_request{eap_msg = <<>>}, EncReq) ->
EncReq.

-spec encode_attributes(#radius_request{}, attribute_list()) -> {binary(), non_neg_integer()}.
encode_attributes(_Req, Attributes) when is_binary(Attributes) ->
{Attributes, byte_size(Attributes)};
encode_attributes(Req, Attributes) ->
F = fun ({A = #attribute{}, Val}, {Body, BodySize}) ->
EncAttr = encode_attribute(Req, A, Val),
Expand Down

0 comments on commit 53de024

Please sign in to comment.