Skip to content

Commit

Permalink
FIX: Missed some changes for libfabric support
Browse files Browse the repository at this point in the history
  • Loading branch information
Craig Ulmer committed Nov 26, 2018
1 parent 0fb30c8 commit e9579e9
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 31 deletions.
3 changes: 3 additions & 0 deletions src/opbox/core/OpBoxCoreStandard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,10 @@ int OpBoxCoreStandard::doUpdate(mailbox_t my_mailbox, Op *op, OpArgs *args){
switch(rc){
case WaitingType::done_and_destroy:
if(my_mailbox!=0){
dbg("Ending active op for mailbox "+to_string(my_mailbox));
endActiveOp(my_mailbox); //Handles delete of op
} else {
dbg("Immediate completion of op with assigned mailbox "+to_string(op->GetAssignedMailbox()));
}
break;

Expand Down
110 changes: 81 additions & 29 deletions src/opbox/net/libfabric_wrapper/fab_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@
#include "opbox/common/Types.hh"


#define PRINTMSG(fd_,...)
//fprintf(fd_,__VA_ARGS__)

using namespace std;
using namespace opbox;
Expand Down Expand Up @@ -211,7 +213,7 @@ fab_transport::eq_readerr(

rd = fi_eq_readerr (eq, &eq_err, 0);
if (rd != sizeof (eq_err)) {
fprintf (stderr, "ERROR: fi_eq_readerr\n");
// fprintf (stderr, "ERROR: fi_eq_readerr\n");
} else {
err_str = fi_eq_strerror (eq, eq_err.prov_errno, eq_err.err_data, NULL, 0);
fprintf (stderr, "%s: %d %s\n", eq_str, eq_err.err,
Expand Down Expand Up @@ -338,7 +340,7 @@ fab_transport::find_peer (

std::lock_guard < std::mutex > lock (conn_mutex);
if (peer_map.find (nodeid) != peer_map.end ()) {
peer = peer_map[nodeid]; // add to connection map
peer = peer_map[nodeid];
} else {
peer = NULL;
}
Expand Down Expand Up @@ -463,9 +465,9 @@ fab_transport::check_completion ()
return -1;
}
if (wc.flags & FI_RECV) {
// fprintf(stdout, "got a RECV completion - wc.flags=%X\n", wc.flags);
PRINTMSG(stdout, "got a RECV completion - wc.flags=%X\n", wc.flags);
if(wc.op_context != NULL) {
// cout << "successful recv - wc.flags=" << std::hex << wc.flags << std::dec << " length " << wc.len << std::endl;
PRINTMSG(stdout, "successful recv - wc.flags=%x length=%u\n", wc.flags, wc.len);
rreq=(fab_recvreq*)wc.op_context;
peer_t *sender = new peer_t(rreq->peer);
message_t *msg = (message_t*)((char*) rreq->repost_buf);
Expand All @@ -476,66 +478,66 @@ fab_transport::check_completion ()
delete sender;
sender = new peer_t(msg_src_peer);
}
// cout << "incoming message from " << rreq->peer->remote_nodeid.GetHex() << endl;
// cout << msg->str() << endl;
PRINTMSG(stdout, "incoming message from %s\n", rreq->peer->remote_nodeid.GetHex().c_str());
PRINTMSG(stdout, "%s\n", msg->str().c_str());
recv_cb_(sender, msg);
rc = fi_recv(rreq->peer->ep_addr, (char*) rreq->repost_buf, FAB_MTU_SIZE , fi_mr_desc(rreq->mr),
0, rreq);
rreq->peer->remote_addr, rreq);
}
} else if (wc.flags & FI_SEND) {
// fprintf(stdout, "got a SEND completion - wc.flags=%X\n", wc.flags);
PRINTMSG(stdout, "got a SEND completion - wc.flags=%X\n", wc.flags);
context = (struct fab_op_context*)wc.op_context;
OpArgs args(UpdateType::send_success);
if (context->user_cb) {
// fprintf(stdout, "invoking user callback\n");
PRINTMSG(stdout, "invoking user callback\n");
WaitingType cb_rc = context->user_cb(&args);
}
// fprintf(stdout, "check_completion - ldo value=%X\n", *(uint64_t*)((char *)context->ldo.GetDataPtr() + context->loffset));
PRINTMSG(stdout, "check_completion - ldo value=%X\n", *(uint64_t*)((char *)context->ldo.GetDataPtr() + context->loffset));
} else if (wc.flags == (FI_RMA | FI_READ)) {
// fprintf(stdout, "got a RMA+READ completion - wc.flags=%X\n", wc.flags);
PRINTMSG(stdout, "got a RMA+READ completion - wc.flags=%X\n", wc.flags);
context = (struct fab_op_context*)wc.op_context;
OpArgs args(UpdateType::get_success);
if (context->user_cb) {
// fprintf(stdout, "invoking user callback\n");
PRINTMSG(stdout, "invoking user callback\n");
WaitingType cb_rc = context->user_cb(&args);
}
// fprintf(stdout, "check_completion - ldo value=%X\n", *(uint64_t*)((char *)context->ldo.GetDataPtr() + context->loffset));
PRINTMSG(stdout, "check_completion - ldo value=%X\n", *(uint64_t*)((char *)context->ldo.GetDataPtr() + context->loffset));
} else if (wc.flags == (FI_RMA | FI_WRITE)) {
// fprintf(stdout, "got a RMA+WRITE completion - wc.flags=%X\n", wc.flags);
PRINTMSG(stdout, "got a RMA+WRITE completion - wc.flags=%X\n", wc.flags);
context = (struct fab_op_context*)wc.op_context;
OpArgs args(UpdateType::put_success);
if (context->user_cb) {
// fprintf(stdout, "invoking user callback\n");
PRINTMSG(stdout, "invoking user callback\n");
WaitingType cb_rc = context->user_cb(&args);
}
// fprintf(stdout, "check_completion - ldo value=%X\n", *(uint64_t*)((char *)context->ldo.GetDataPtr() + context->loffset));
PRINTMSG(stdout, "check_completion - ldo value=%X\n", *(uint64_t*)((char *)context->ldo.GetDataPtr() + context->loffset));
} else if (wc.flags == (FI_ATOMIC | FI_READ)) {
// fprintf(stdout, "got a ATOMIC+READ completion - wc.flags=%X\n", wc.flags);
PRINTMSG(stdout, "got a ATOMIC+READ completion - wc.flags=%X\n", wc.flags);
context = (struct fab_op_context*)wc.op_context;
OpArgs args(UpdateType::atomic_success);
if (context->user_cb) {
// fprintf(stdout, "invoking user callback\n");
PRINTMSG(stdout, "invoking user callback\n");
WaitingType cb_rc = context->user_cb(&args);
}
// fprintf(stdout, "check_completion - ldo value=%X\n", *(uint64_t*)((char *)context->ldo.GetDataPtr() + context->loffset));
PRINTMSG(stdout, "check_completion - ldo value=%X\n", *(uint64_t*)((char *)context->ldo.GetDataPtr() + context->loffset));
} else if (wc.flags == (FI_ATOMIC | FI_WRITE)) {
// fprintf(stdout, "got a ATOMIC+WRITE completion - wc.flags=%X\n", wc.flags);
PRINTMSG(stdout, "got a ATOMIC+WRITE completion - wc.flags=%X\n", wc.flags);
context = (struct fab_op_context*)wc.op_context;
OpArgs args(UpdateType::atomic_success);
if (context->user_cb) {
// fprintf(stdout, "invoking user callback\n");
PRINTMSG(stdout, "invoking user callback\n");
WaitingType cb_rc = context->user_cb(&args);
}
// fprintf(stdout, "check_completion - ldo value=%X\n", *(uint64_t*)((char *)context->ldo.GetDataPtr() + context->loffset));
PRINTMSG(stdout, "check_completion - ldo value=%X\n", *(uint64_t*)((char *)context->ldo.GetDataPtr() + context->loffset));
} else if (wc.flags == FI_ATOMIC) {
// fprintf(stdout, "got a ATOMIC completion - wc.flags=%X\n", wc.flags);
PRINTMSG(stdout, "got a ATOMIC completion - wc.flags=%X\n", wc.flags);
context = (struct fab_op_context*)wc.op_context;
OpArgs args(UpdateType::atomic_success);
if (context->user_cb) {
// fprintf(stdout, "invoking user callback\n");
PRINTMSG(stdout, "invoking user callback\n");
WaitingType cb_rc = context->user_cb(&args);
}
// fprintf(stdout, "check_completion - ldo value=%X\n", *(uint64_t*)((char *)context->ldo.GetDataPtr() + context->loffset));
PRINTMSG(stdout, "check_completion - ldo value=%X\n", *(uint64_t*)((char *)context->ldo.GetDataPtr() + context->loffset));
} else {
fprintf(stdout, "got completion with unknown flags - wc.flags=%X\n", wc.flags);
}
Expand Down Expand Up @@ -626,7 +628,7 @@ fab_transport::create_rdm_connection_server(
(char*)rreq->repost_buf,
FAB_MTU_SIZE,
fi_mr_desc(rreq->mr),
0,
peer->remote_addr,
rreq);
}
std::lock_guard < std::mutex > lock (conn_mutex);
Expand All @@ -642,7 +644,6 @@ fab_transport::create_rdm_connection_server(
// cout << "gni_server_connection sent result" << results.str() << "\n";
}


int
fab_transport::fab_init_rdm(const char *provider_name)
{
Expand Down Expand Up @@ -726,6 +727,9 @@ fab_transport::fab_init_rdm(const char *provider_name)
webhook::Server::registerHook("/fab/rdmlookup", [this] (const map<string,string> &args, stringstream &results) {
create_rdm_connection_server(args, results);
});
webhook::Server::registerHook("/fab/disconnect", [this] (const map<string,string> &args, stringstream &results) {
destroy_rdm_connection_server(args, results);
});
//cout << "Fab init RDM done " << std::endl;
initialized = true;

Expand Down Expand Up @@ -787,7 +791,7 @@ fab_transport::create_rdm_connection_client(
rreq->mr= rcvbuf->mr;
// print_fab_recvreq(rreq);
ret = fi_recv(ep, (char*)rcvbuf->buf + offset, FAB_MTU_SIZE , fi_mr_desc(rcvbuf->mr),
0, rreq);
peer->remote_addr, rreq);
}
if (peer != NULL) {
peer_map[peer_nodeid] = peer;
Expand Down Expand Up @@ -1110,7 +1114,7 @@ fab_transport::ib_server_conn ()

if (rd < 0) {
eq_readerr(eq, errstr);
fprintf(stderr, "ib_server_conn() - rd=%d, event=%d, errstr=%s, errno=%d (%s)\n", rd, event, errstr, errno, strerror(errno));
// fprintf(stderr, "ib_server_conn() - rd=%d, event=%d, errstr=%s, errno=%d (%s)\n", rd, event, errstr, errno, strerror(errno));
continue;
}

Expand Down Expand Up @@ -1266,6 +1270,54 @@ fab_transport::stop_connection_thread(void)
/* End of Infiniband related methods */


int
fab_transport::disconnect(struct fab_peer *peer)
{
int ret;
string result;
stringstream ss_path;

ss_path << "/fab/disconnect&rem_webhook_hostname="<<mynodeid.GetIP()<<"&rem_webhook_port="<<mynodeid.GetPort();
ret = webhook::retrieveData(peer->remote_nodeid.GetIP() , peer->remote_nodeid.GetPort() , ss_path.str(), &result);

auto victim = peer_map.find(peer->remote_nodeid);
if (victim != peer_map.end()) {
peer_map.erase(peer->remote_nodeid);
ret = fi_av_remove(av, &peer->remote_addr, 1, 0);
} else {
abort();
}
return 0;
}

void
fab_transport::destroy_rdm_connection_server(
const std::map<std::string,std::string> &args,
std::stringstream &results)
{
int ret;
string hostname, port;

auto new_val = args.find("rem_webhook_hostname");
if(new_val != args.end()){
hostname = new_val->second;
}
new_val = args.find("rem_webhook_port");
if(new_val != args.end()){
port = new_val->second;
}
faodel::nodeid_t remote_nodeid(hostname, port);

auto victim = peer_map.find(remote_nodeid);
if (victim != peer_map.end()) {
fab_peer *peer = peer_map[remote_nodeid];
peer_map.erase(remote_nodeid);
ret = fi_av_remove(av, &peer->remote_addr, 1, 0);
} else {
abort();
}
}

void
fab_transport::Send(
fab_peer *remote_peer,
Expand Down
2 changes: 2 additions & 0 deletions src/opbox/net/libfabric_wrapper/fab_transport.hh
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public:
void stop_progress_thread(void);
void start_ib_connection_thread(void);
void stop_connection_thread(void);
int disconnect(struct fab_peer *peer);
int check_completion();
int fab_post_recv (struct fid_ep *ep, void *buf, struct fid_mr *mr, int size, int n);
int init_endpoint(fid_ep *ep);
Expand All @@ -135,6 +136,7 @@ public:
fab_peer* create_rdm_connection_client (faodel::nodeid_t peer_nodeid);
void create_rdm_connection_server(const std::map<std::string,std::string> &args, std::stringstream &results);
int fab_init_rdm(const char *provider_name);
void destroy_rdm_connection_server(const std::map<std::string,std::string> &args, std::stringstream &results);

// net layer calls
void Send(
Expand Down
4 changes: 2 additions & 2 deletions src/opbox/net/libfabric_wrapper/libfabric_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ int
Disconnect(
peer_ptr_t peer)
{
return 0;
return fabtrns->disconnect(peer->p);
}

int
Expand All @@ -392,7 +392,7 @@ Disconnect(
if (p == nullptr) {
return -1;
}
return Disconnect((peer_ptr_t)p);
return fabtrns->disconnect(p);
}

/*
Expand Down

0 comments on commit e9579e9

Please sign in to comment.