Skip to content

Commit

Permalink
Vutils
Browse files Browse the repository at this point in the history
  • Loading branch information
vic4key committed Aug 31, 2024
1 parent 69cbe9f commit 137b315
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 43 deletions.
24 changes: 12 additions & 12 deletions Test/Sample.AsyncSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ void example_binding(const vu::Endpoint& endpoint)
printf("client %d closed\n", client.get_remote_sai().sin_port);
});

server.on(vu::AsyncSocket::SEND, [](vu::Socket& client) -> void
{
std::string s = "hello from server";
client.send(s.data(), int(s.size()));
printf("client %d send `%s`\n", client.get_remote_sai().sin_port, s.c_str());
});
//server.on(vu::AsyncSocket::SEND, [](vu::Socket& client) -> void
//{
// std::string s = "hello from server";
// client.send(s.data(), int(s.size()));
// printf("client %d send `%s`\n", client.get_remote_sai().sin_port, s.c_str());
//});

server.on(vu::AsyncSocket::RECV, [](vu::Socket& client) -> void
{
Expand Down Expand Up @@ -57,12 +57,12 @@ void example_inheritance(const vu::Endpoint& endpoint)
printf("client %d closed\n", client.get_remote_sai().sin_port);
}

virtual void on_send(vu::Socket& client)
{
std::string s = "hello from server";
client.send(s.data(), int(s.size()));
printf("client %d send `%s`\n", client.get_remote_sai().sin_port, s.c_str());
}
//virtual void on_send(vu::Socket& client)
//{
// std::string s = "hello from server";
// client.send(s.data(), int(s.size()));
// printf("client %d send `%s`\n", client.get_remote_sai().sin_port, s.c_str());
//}

virtual void on_recv(vu::Socket& client)
{
Expand Down
27 changes: 16 additions & 11 deletions include/Vutils.h
Original file line number Diff line number Diff line change
Expand Up @@ -1324,7 +1324,7 @@ class Socket : public LastError
VUResult vuapi accept(Handle& socket);

VUResult vuapi connect(const Endpoint& endpoint);
VUResult vuapi disconnect(const shutdowns_t flags = SD_BOTH, const bool cleanup = true);
VUResult vuapi disconnect(const shutdowns_t flags = SD_BOTH, const bool cleanup = false);

IResult vuapi send(const char* ptr_data, int size, const flags_t flags = MSG_NONE);
IResult vuapi send(const Buffer& data, const flags_t flags = MSG_NONE);
Expand Down Expand Up @@ -1376,11 +1376,11 @@ class AsyncSocket : public LastError

enum function : uint
{
CONNECT,
OPEN,
CLOSE,
RECV,
SEND,
CONNECT, // for only client side
OPEN, // for only server side
CLOSE, // for both server & client
RECV, // for both server & client
SEND, // for both server & client (should not use except required a special case)
UNDEFINED,
};

Expand All @@ -1404,12 +1404,16 @@ class AsyncSocket : public LastError
bool vuapi running() const;

/**
* for client side
* https://docs.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-wsaeventselect?redirectedfrom=MSDN#return-value
* Note: After connected (FD_CONNECT), client will be auto generated first event FD_WRITE.
*/
VUResult vuapi connect(const Endpoint& endpoint);
VUResult vuapi connect(const std::string& address, const ushort port);

/**
* for server side
*/
VUResult vuapi bind(const Endpoint& endpoint);
VUResult vuapi bind(const std::string& address, const ushort port);

Expand All @@ -1419,14 +1423,13 @@ class AsyncSocket : public LastError
*/
VUResult vuapi listen(const int maxcon = SOMAXCONN);

VUResult vuapi run();
VUResult vuapi run_in_thread();
VUResult vuapi run(const bool in_worker_thread = false);

VUResult vuapi stop();
IResult vuapi close();
IResult vuapi close(const Socket::shutdowns_t flags = SD_BOTH, const bool cleanup = false);

std::set<SOCKET> vuapi get_connections() const;
VUResult vuapi disconnect_connections(const Socket::shutdowns_t flags = SD_BOTH, const bool cleanup = true);
void vuapi get_connections(std::set<SOCKET>& connections);
VUResult vuapi disconnect_connections(const Socket::shutdowns_t flags = SD_BOTH, const bool cleanup = false);

IResult vuapi send(const SOCKET& connection, const char* ptr_data, int size, const Socket::flags_t flags = MSG_NONE);
IResult vuapi send(const SOCKET& connection, const Buffer& data, const Socket::flags_t flags = MSG_NONE);
Expand All @@ -1442,6 +1445,7 @@ class AsyncSocket : public LastError
protected:
void vuapi initialze();
VUResult vuapi loop();
VUResult vuapi run_loop();

IResult vuapi do_connect(WSANETWORKEVENTS& events, SOCKET& connection);
IResult vuapi do_open(WSANETWORKEVENTS& events, SOCKET& connection);
Expand All @@ -1455,6 +1459,7 @@ class AsyncSocket : public LastError
bool m_running;
DWORD m_n_events;
SOCKET m_connections[WSA_MAXIMUM_WAIT_EVENTS];
std::recursive_mutex m_mutex_client_list;
WSAEVENT m_events[WSA_MAXIMUM_WAIT_EVENTS];
fn_prototype_t m_functions[function::UNDEFINED];
std::mutex m_mutex;
Expand Down
65 changes: 46 additions & 19 deletions src/details/asyncsocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ void vuapi AsyncSocket::initialze()
{
m_n_events = 0;

std::lock_guard<std::recursive_mutex> lg(m_mutex_client_list);

memset(m_connections, int(INVALID_SOCKET), sizeof(m_connections));
memset(m_events, int(0), sizeof(m_events));

Expand Down Expand Up @@ -95,6 +97,8 @@ VUResult vuapi AsyncSocket::listen(const int maxcon)
return 2;
}

std::lock_guard<std::recursive_mutex> lg(m_mutex_client_list);

m_connections[m_n_events] = m_socket.handle();
m_events[m_n_events] = event;
m_n_events++;
Expand All @@ -106,11 +110,11 @@ VUResult vuapi AsyncSocket::listen(const int maxcon)
return result;
}

IResult vuapi AsyncSocket::close()
IResult vuapi AsyncSocket::close(const Socket::shutdowns_t flags, const bool cleanup)
{
this->stop();

this->disconnect_connections();
this->disconnect_connections(flags, cleanup);

if (m_thread != INVALID_HANDLE_VALUE)
{
Expand Down Expand Up @@ -148,6 +152,8 @@ VUResult vuapi AsyncSocket::connect(const Endpoint& endpoint)
return 2;
}

std::lock_guard<std::recursive_mutex> lg(m_mutex_client_list);

auto result = m_socket.connect(endpoint);
if (result == VU_OK)
{
Expand All @@ -169,34 +175,37 @@ VUResult vuapi AsyncSocket::connect(const std::string& address, const ushort por
return this->connect(endpoint);
}

std::set<SOCKET> vuapi AsyncSocket::get_connections() const
void vuapi AsyncSocket::get_connections(std::set<SOCKET>& connections)
{
std::set<SOCKET> result;
connections.clear();

if (m_socket.available())
{
std::lock_guard<std::recursive_mutex> lg(m_mutex_client_list);

for (auto& socket : m_connections)
{
if (socket == INVALID_SOCKET)
if (socket == INVALID_SOCKET) // ignore invalid socket handle
{
continue;
}

if (m_side == side_type::SERVER && socket == m_socket.handle())
if (m_side == side_type::SERVER && socket == m_socket.handle()) // ignore server socket handle
{
continue;
}

result.insert(socket);
connections.insert(socket);
}
}

return result;
}

VUResult vuapi AsyncSocket::disconnect_connections(const Socket::shutdowns_t flags, const bool cleanup)
{
auto connections = this->get_connections();
std::lock_guard<std::recursive_mutex> lg(m_mutex_client_list);

std::set<SOCKET> connections;
this->get_connections(connections);
for (const auto& connection : connections)
{
Socket socket(m_socket);
Expand All @@ -217,14 +226,21 @@ static DWORD WINAPI AsyncSocket_Threading(LPVOID lpParam)
return 0;
}

VUResult vuapi AsyncSocket::run_in_thread()
VUResult vuapi AsyncSocket::run(const bool in_worker_thread)
{
m_thread = CreateThread(nullptr, 0, LPTHREAD_START_ROUTINE(AsyncSocket_Threading), this, 0, nullptr);
m_last_error_code = GetLastError();
return m_thread != INVALID_HANDLE_VALUE ? VU_OK : 1;
if (in_worker_thread)
{
m_thread = CreateThread(nullptr, 0, LPTHREAD_START_ROUTINE(AsyncSocket_Threading), this, 0, nullptr);
m_last_error_code = GetLastError();
return m_thread != INVALID_HANDLE_VALUE ? VU_OK : 1;
}
else
{
return this->run_loop();
}
}

VUResult vuapi AsyncSocket::run()
VUResult vuapi AsyncSocket::run_loop()
{
if (!m_socket.available())
{
Expand Down Expand Up @@ -349,6 +365,8 @@ IResult vuapi AsyncSocket::do_open(WSANETWORKEVENTS& events, SOCKET& connection)
return events.iErrorCode[FD_ACCEPT_BIT];
}

std::lock_guard<std::recursive_mutex> lg(m_mutex_client_list);

Socket::Handle obj = { 0 };
int n = static_cast<int>(sizeof(obj.sai));

Expand Down Expand Up @@ -380,6 +398,8 @@ IResult vuapi AsyncSocket::do_recv(WSANETWORKEVENTS& events, SOCKET& connection)
return events.iErrorCode[FD_READ_BIT];
}

std::lock_guard<std::recursive_mutex> lg(m_mutex_client_list);

Socket socket(m_socket);
socket.attach(connection);
this->on_recv(socket);
Expand All @@ -395,6 +415,8 @@ IResult vuapi AsyncSocket::do_send(WSANETWORKEVENTS& events, SOCKET& connection)
return events.iErrorCode[FD_WRITE_BIT];
}

std::lock_guard<std::recursive_mutex> lg(m_mutex_client_list);

Socket socket(m_socket);
socket.attach(connection);
this->on_send(socket);
Expand All @@ -405,10 +427,15 @@ IResult vuapi AsyncSocket::do_send(WSANETWORKEVENTS& events, SOCKET& connection)

IResult vuapi AsyncSocket::do_close(WSANETWORKEVENTS& events, SOCKET& connection)
{
if (events.iErrorCode[FD_CLOSE_BIT] != 0)
{
return events.iErrorCode[FD_CLOSE_BIT];
}
// TODO: In certain cases(e.g., user - mode drivers), it crashes.
// I'm not sure why, so temporarily comment out these codes.
//
// if (events.iErrorCode[FD_CLOSE_BIT] != 0)
// {
// return events.iErrorCode[FD_CLOSE_BIT];
// }

std::lock_guard<std::recursive_mutex> lg(m_mutex_client_list);

std::vector<std::pair<SOCKET, HANDLE>> in_used_connections;

Expand Down
2 changes: 1 addition & 1 deletion src/details/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ VUResult vuapi Socket::disconnect(const shutdowns_t flags, const bool cleanup)
return 1;
}

if (cleanup) // clean-up all remaining data in the socket
if (cleanup) // clean-up all remaining data in the socket (does not need with the SD_BOTH flag)
{
vu::Buffer temp;
this->recv_all(temp);
Expand Down

0 comments on commit 137b315

Please sign in to comment.