/* $Source: bitbucket.org:berkeleylab/gasnet.git/ofi-conduit/gasnet_ofi.c $ * Description: GASNet libfabric (OFI) conduit Implementation * Copyright 2002, Dan Bonachea * Copyright 2015-2017, Intel Corporation * Portions copyright 2018-2020, The Regents of the University of California. * Terms of use are as specified in license.txt */ #define GASNETI_NEED_GASNET_MK_H 1 #include #include #include #include #include #include #include #include #include #include #include #if HAVE_SYS_UIO_H #include /* For struct iovec */ #endif GASNETI_IDENT(gasnetc_IdentString_Providers, "$GASNetSupportedOFIProviders: " GASNETC_OFI_PROVIDER_LIST " $"); struct fid_fabric* gasnetc_ofi_fabricfd; struct fid_domain* gasnetc_ofi_domainfd; struct fid_av* gasnetc_ofi_avfd; struct fid_cq* gasnetc_ofi_tx_cqfd; // CQ, ideally for both AM and RDMA tx ops #if GASNETC_OFI_USE_MULTI_CQ struct fid_cq* gasnetc_ofi_reqtx_cqfd = NULL; // CQ for AM Request tx ops, IFF cannot share struct fid_cq* gasnetc_ofi_reptx_cqfd = NULL; // CQ for AM Request tx ops, IFF cannot share #endif struct fid_ep* gasnetc_ofi_rdma_epfd; struct fid_ep* gasnetc_ofi_request_epfd; struct fid_ep* gasnetc_ofi_reply_epfd; struct fid_cq* gasnetc_ofi_request_cqfd; struct fid_cq* gasnetc_ofi_reply_cqfd; #if GASNET_SEGMENT_EVERYTHING struct fid_mr* gasnetc_segment_mrfd = NULL; #endif struct fid_mr* gasnetc_auxseg_mrfd = NULL; size_t gasnetc_ofi_bbuf_threshold; #ifdef FI_MR_ENDPOINT static int gasnetc_fi_mr_endpoint = 0; #endif #if GASNET_HAVE_MK_CLASS_MULTIPLE static int gasnetc_fi_hmem = 0; #endif size_t gasnetc_ofi_max_medium = GASNETC_OFI_MAX_MEDIUM_DFLT; typedef struct gasnetc_ofi_recv_metadata { struct iovec iov; struct fi_msg am_buff_msg; gasnetc_ofi_recv_ctxt_t am_buff_ctxt; } gasnetc_ofi_recv_metadata_t; #define USE_AV_MAP 0 // Must match order of fi_getname() calls in ofi_exchange_addresses(), // where this is enforced via static assertions. enum { GASNETC_FADDR_IDX_REQ = 0, GASNETC_FADDR_IDX_REP, GASNETC_FADDR_IDX_RDMA, NUM_OFI_ENDPOINTS }; #define GASNETC_FABRIC_ADDR_OFFSET(idx, jobrank) \ ((idx) + (jobrank)*NUM_OFI_ENDPOINTS) #if USE_AV_MAP static addr_table_t *addr_table; #endif // TODO: multi-ep with independent resources will require rewriting this GASNETI_INLINE(gasnetc_fabric_addr_inner) GASNETI_PURE fi_addr_t gasnetc_fabric_addr_inner(int idx, gex_Rank_t jobrank) { #if USE_AV_MAP return (fi_addr_t)(addr_table->table[GASNETC_FABRIC_ADDR_OFFSET(idx, jobrank)]); #else return (fi_addr_t)GASNETC_FABRIC_ADDR_OFFSET(idx, jobrank); #endif } GASNETI_PUREP(gasnetc_fabric_addr_inner) #define gasnetc_fabric_addr(type, jobrank) \ gasnetc_fabric_addr_inner(GASNETC_FADDR_IDX_##type, jobrank) #define SCALABLE_NOT_AUTO_DETECTED (-1) static short has_mr_scalable = SCALABLE_NOT_AUTO_DETECTED; #ifdef GASNETC_OFI_HAS_MR_SCALABLE #define GASNETC_OFI_HAS_MR_SCALABLE_STATIC 1 #else // cast prevents erroneous use in preprocessor directives #define GASNETC_OFI_HAS_MR_SCALABLE ((short)has_mr_scalable) #endif // Alias unless/until the properties are split #define GASNETC_OFI_HAS_MR_PROV_KEY (!GASNETC_OFI_HAS_MR_SCALABLE) #define GASNETC_OFI_HAS_MR_VIRT_ADDR (!GASNETC_OFI_HAS_MR_SCALABLE) // Table of remote registration keys, used only when GASNETC_OFI_HAS_MR_PROV_KEY // (e.g. FI_MR_BASIC). Otherwise NULL. // The leading dimension is EP index, with indices in [-1, GASNET_MAXEPS) to // place the aux segment keys as index -1, and the second dimension is jobrank. // The leading dimension is allocated at startup, and the rest is allocated // lazily when the first entry for a given EP index is received. // TODO: scalable storage static uint64_t** gasnetc_remote_key_tbl; static size_t tx_cq_size = 0; static size_t rx_cq_size = 0; // Determine if (jobrank,addr) is in the aux segment registration GASNETI_INLINE(gasnetc_in_auxseg) GASNETI_PURE int gasnetc_in_auxseg(gex_Rank_t jobrank, void *addr) { #if GASNET_SEGMENT_FAST || GASNET_SEGMENT_LARGE // Reminder that with unsigned types, a negative offset is a very large positive value uintptr_t offset = (uintptr_t)addr - (uintptr_t)gasneti_seginfo_aux[jobrank].addr; return offset < gasneti_seginfo_aux[jobrank].size; #else // For SEGMENT_EVERYTHING there is only ever a single memory registration. // So the addrres is NEVER "in the aux segment registration". return 0; #endif } GASNETI_PUREP(gasnetc_in_auxseg) // Lookup or compute correct key for RDMA // NOTE: rem_epidx has type int (not gex_EP_Index_t) to allow -1 to name the aux seg GASNETI_INLINE(gasnetc_remote_key) GASNETI_PURE uint64_t gasnetc_remote_key(gex_Rank_t jobrank, int rem_epidx) { #if GASNET_SEGMENT_FAST || GASNET_SEGMENT_LARGE gasneti_assert_int(rem_epidx ,>=, -1); gasneti_assert_int(rem_epidx ,<, GASNET_MAXEPS); if (GASNETC_OFI_HAS_MR_PROV_KEY) { gasneti_assert(gasnetc_remote_key_tbl[rem_epidx]); return gasnetc_remote_key_tbl[rem_epidx][jobrank]; } else { return GASNETC_EPIDX_TO_KEY(rem_epidx); } #else // For SEGMENT_EVERYTHING there is a single host memory registration gasneti_assert_int(rem_epidx ,>=, 0); gasneti_assert_int(rem_epidx ,<, GASNET_MAXEPS); return GASNETC_EPIDX_TO_KEY(0); #endif } GASNETI_PUREP(gasnetc_remote_key) // Lookup correct "address" (which may be an offset) for RDMA // NOTE: rem_epidx has type int (not gex_EP_Index_t) to allow -1 to name the aux seg GASNETI_INLINE(gasnetc_remote_addr) GASNETI_PURE uintptr_t gasnetc_remote_addr(gex_Rank_t jobrank, void *addr, int rem_epidx) { #if GASNET_SEGMENT_FAST || GASNET_SEGMENT_LARGE gasneti_assert_int(rem_epidx ,>=, -1); gasneti_assert_int(rem_epidx ,<, GASNET_MAXEPS); if (GASNETC_OFI_HAS_MR_VIRT_ADDR) { // BASIC uses the virtual address return (uintptr_t)addr; } else { // SCALABLE uses an offset rather than virtual address int in_auxseg = (rem_epidx < 0); gasnet_seginfo_t *si = in_auxseg ? gasneti_seginfo_aux : gasneti_seginfo_tbl[rem_epidx]; gasneti_assert(si); return (uintptr_t)addr - (uintptr_t)si[jobrank].addr; } #else gasneti_assert_int(rem_epidx ,>=, 0); gasneti_assert_int(rem_epidx ,<, GASNET_MAXEPS); // SCALABLE uses an offset, but base address is zero for EVERYTHING return (uintptr_t)addr; #endif } GASNETI_PUREP(gasnetc_remote_addr) // Statements which launch a fi_write or fi_read, setting "ret" #define OFI_RMA(rw, c_ep, loc_addr, nbytes, jobrank, rem_epidx, rem_addr, ctxt_ptr, aux) \ do { \ fi_addr_t _peer = gasnetc_fabric_addr(RDMA, jobrank); \ uintptr_t _addr = gasnetc_remote_addr(jobrank, rem_addr, rem_epidx); \ uint64_t _key = gasnetc_remote_key(jobrank, rem_epidx); \ void *_op_ctxt = gasnetc_rdma_ctxt_to_op_ctxt(ctxt_ptr, aux); \ void *_desc = gasneti_i_segment_kind_is_host(c_ep->_segment) ? NULL: fi_mr_desc(c_ep->mrfd); \ struct fid_ep *_ofi_ep = gasnetc_ofi_rdma_epfd; /* TODO: ep isolation */ \ ret = fi_##rw(_ofi_ep, loc_addr, nbytes, _desc, _peer, _addr, _key, _op_ctxt); \ } while(0) #define OFI_WRITE(c_ep, src_addr, nbytes, jobrank, rem_epidx, dest_addr, ctxt_ptr, aux) \ OFI_RMA(write, c_ep, src_addr, nbytes, jobrank, rem_epidx, dest_addr, ctxt_ptr, aux) #define OFI_READ(c_ep, dest_addr, nbytes, jobrank, rem_epidx, src_addr, ctxt_ptr, aux) \ OFI_RMA(read, c_ep, dest_addr, nbytes, jobrank, rem_epidx, src_addr, ctxt_ptr, aux) /* Poll periodically on RMA injection to ensure efficient progress. * This is a data race, but it is safe as polling here is unnecessary, it * simply improves performance in case of floods of RMA requests*/ static int rdma_poll_frequency = 0; static int rdma_periodic_poll_threshold; /* Set via environment variable in init() */ #define PERIODIC_RMA_POLL() do{\ if_pf(rdma_poll_frequency++ >= rdma_periodic_poll_threshold){\ rdma_poll_frequency=0;\ gasnetc_ofi_tx_poll();\ }} while(0) /* In this case, inject means "inject into the network". It is not specific * to the fi_inject/fi_inject_write functions. fxn must assign a return value * to an int named "ret" in the scope in which it is called.*/ #define OFI_INJECT_RETRY(lock, fxn, poll_type)\ do {\ GASNETC_OFI_LOCK_EXPR(lock, fxn);\ GASNETI_SPIN_WHILE(ret == -FI_EAGAIN, {\ GASNETC_OFI_POLL_SELECTIVE(poll_type);\ GASNETC_OFI_LOCK_EXPR(lock, fxn);\ });\ }while(0) // As above, except that if 'imm' is non-zero, then will jump to 'label' // if first attempt returns -FI_EAGAIN // TODO: support poll if GASNETC_IMMEDIATE_AMPOLLS? #define OFI_INJECT_RETRY_IMM(lock, fxn, poll_type, imm, label)\ do {\ GASNETC_OFI_LOCK_EXPR(lock, fxn);\ if (ret == -FI_EAGAIN) {\ if (imm) goto label; \ GASNETI_SPIN_DOWHILE(ret == -FI_EAGAIN, {\ GASNETC_OFI_POLL_SELECTIVE(poll_type);\ GASNETC_OFI_LOCK_EXPR(lock, fxn);\ });\ } \ }while(0) static gasneti_lifo_head_t ofi_am_request_pool = GASNETI_LIFO_INITIALIZER; static gasneti_lifo_head_t ofi_am_reply_pool = GASNETI_LIFO_INITIALIZER; static gasneti_lifo_head_t ofi_bbuf_pool = GASNETI_LIFO_INITIALIZER; static gasneti_lifo_head_t ofi_bbuf_ctxt_pool = GASNETI_LIFO_INITIALIZER; static size_t num_multirecv_buffs; static size_t multirecv_buff_size; static void* receive_region_start = NULL; static size_t receive_region_size = 0; /* Variables for bounce buffering of non-blocking, non-bulk puts. * The gasnetc_ofi_bbuf_threshold variable is defined in gasnet_ofi.h * as it is needed in other files */ static void* bounce_region_start = NULL; static size_t bounce_region_size = 0; static size_t ofi_num_bbufs; static size_t ofi_bbuf_size; #define OFI_MAX_NUM_BOUNCE_BUFFERS 32 static void* am_buffers_region_start = NULL; static size_t am_buffers_region_size = 0; static gasneti_semaphore_t num_unallocated_request_buffers; static gasneti_semaphore_t num_unallocated_reply_buffers; static size_t max_am_request_buffs = 0; static size_t max_am_reply_buffs = 0; static size_t num_init_am_request_buffs = 0; static size_t num_init_am_reply_buffs = 0; static size_t long_rma_threshold = 0; static uint64_t max_buffered_send; static uint64_t max_buffered_write; static uint64_t min_multi_recv; static int using_psm_provider = 0; static char *gasnetc_ofi_device = NULL; static const char *supported_providers = GASNETC_OFI_PROVIDER_LIST; static int gasnetc_high_perf_prov = 0; static char *gasnetc_ofi_provider = NULL; static char *gasnetc_ofi_domain = NULL; static struct fi_info *gasnetc_rma_info; static struct fi_info *gasnetc_msg_info; gasneti_spawnerfn_t const *gasneti_spawner = NULL; static gasnetc_ofi_recv_metadata_t* metadata_array; /* Being able to see if there are pending operations that are not * completing is useful for debugging purposes */ #if GASNET_DEBUG static gasnetc_paratomic_t pending_rdma = gasnetc_paratomic_init(0); static gasnetc_paratomic_t pending_am = gasnetc_paratomic_init(0); #endif static int gasnetc_ofi_inited = 0; // OFI_CONDUIT_VERSION: API version to request in fi_getinfo() // NOTE: we do NOT blindly chase the latest to avoid nasty surprises. // // FI_MR_HMEM and corresponding fields in `struct fi_mr_attr` first // appear in API version 1.9. So, memory kinds needs >= 1.9. // // FI_MR_{SCALABLE,BASIC} are deprecated since 1.5. // Some newer providers don't support them. // So, use API version >= 1.5 if possible. #if GASNET_HAVE_MK_CLASS_MULTIPLE // No need to check MAJOR,MINOR against FI_VERSION(1, 9) since configure has verified version 1.11+. #define OFI_CONDUIT_VERSION FI_VERSION(1, 9) #elif FI_VERSION(FI_MAJOR_VERSION, FI_MINOR_VERSION) >= FI_VERSION(1, 5) #define OFI_CONDUIT_VERSION FI_VERSION(1, 5) #else #define OFI_CONDUIT_VERSION FI_VERSION(1, 0) #endif #if GASNET_PSHM #define gasnetc_AMPSHMPoll(repliesOnly) gasneti_AMPSHMPoll(repliesOnly GASNETI_THREAD_PASS) #else #define gasnetc_AMPSHMPoll(repliesOnly) ((void)0) #endif #define GASNETC_OFI_POLL_EVERYTHING() do { gasnetc_ofi_poll(); gasnetc_AMPSHMPoll(0);} while (0) #define GASNETC_OFI_POLL_SELECTIVE(type) do {\ if (type == OFI_POLL_ALL) {\ gasnetc_ofi_am_recv_poll_cold(1);\ gasnetc_AMPSHMPoll(0);\ }\ else {\ gasnetc_AMPSHMPoll(1);\ }\ gasnetc_ofi_am_recv_poll_cold(0);\ gasnetc_ofi_tx_poll();\ }while(0) #if GASNET_PAR static inline int gasnetc_is_exiting(void) { gasneti_sync_reads(); return gasnetc_exit_in_progress; } #define gasnetc_is_exit_error(e) \ (gasnetc_is_exiting() && ((e).err == FI_SUCCESS || (e).err == FI_ECANCELED || (e).err == EACCES)) #else #define gasnetc_is_exit_error(e) 0 #endif // Conversion between conduit's AM context types and operation context // The "to" operations include type checking that simple casts would not GASNETI_INLINE(gasnetc_send_ctxt_to_op_ctxt) void *gasnetc_send_ctxt_to_op_ctxt(gasnetc_ofi_send_ctxt_t *p) { return &p->ctxt; } GASNETI_INLINE(gasnetc_op_ctxt_to_send_ctxt) gasnetc_ofi_send_ctxt_t *gasnetc_op_ctxt_to_send_ctxt(void *p) { return gasneti_container_of(p, gasnetc_ofi_send_ctxt_t, ctxt); } GASNETI_INLINE(gasnetc_recv_ctxt_to_op_ctxt) void *gasnetc_recv_ctxt_to_op_ctxt(gasnetc_ofi_recv_ctxt_t *p) { return &p->ctxt; } GASNETI_INLINE(gasnetc_op_ctxt_to_recv_ctxt) gasnetc_ofi_recv_ctxt_t *gasnetc_op_ctxt_to_recv_ctxt(void *p) { return gasneti_container_of(p, gasnetc_ofi_recv_ctxt_t, ctxt); } // We reserve low 2 bits to hold per-operation "aux" data for RDMA callbacks // Assumes pointers have at least 4-bytes alignment in structs #define GASNETC_RDMA_CTXT_MASK (~(uintptr_t)3) // Conversion from conduit's RDMA contexts to fi operation context. // This is for use with gasnetc_ofi_{nb,bounce,blocking}_op_ctxt_t, // where we use the callback function as the operation context. // The callbacks perform the reverse using gasneti_container_of(). GASNETI_INLINE(gasnetc_rdma_ctxt_to_op_ctxt_inner) GASNETT_PURE void *gasnetc_rdma_ctxt_to_op_ctxt_inner(void *p, unsigned int aux) { uintptr_t raw = (uintptr_t)p; gasneti_assert(0 == (raw & ~GASNETC_RDMA_CTXT_MASK)); gasneti_assert(0 == (aux & GASNETC_RDMA_CTXT_MASK)); return (void *)(raw | aux); } GASNETT_PUREP(gasnetc_rdma_ctxt_to_op_ctxt_inner) #define gasnetc_rdma_ctxt_to_op_ctxt(p,aux) \ gasnetc_rdma_ctxt_to_op_ctxt_inner(&(p)->callback,aux) // Convert fi operation context to rdma callback function and call GASNETI_INLINE(gasnetc_op_ctxt_run_rdma_callback) void gasnetc_op_ctxt_run_rdma_callback(void *ctxt) { uintptr_t raw = (uintptr_t)ctxt; void *ptr = (void*)(raw & GASNETC_RDMA_CTXT_MASK); unsigned int aux = (raw & ~GASNETC_RDMA_CTXT_MASK); gasnetc_rdma_callback_fn callback = *(gasnetc_rdma_callback_fn *)ptr; callback(ptr, aux); } /*------------------------------------------------- * Function Declarations *-------------------------------------------------*/ GASNETI_INLINE(gasnetc_ofi_handle_am) void gasnetc_ofi_handle_am(gasnetc_ofi_am_send_buf_t *header, int isreq, size_t msg_len, uint64_t cq_data); void gasnetc_ofi_am_send_complete(gasnetc_ofi_send_ctxt_t *header); void gasnetc_ofi_tx_poll(); GASNETI_INLINE(gasnetc_ofi_am_recv_poll) void gasnetc_ofi_am_recv_poll(int is_request); void gasnetc_ofi_am_recv_poll_cold(int is_request) { // non-inline wrapper to avoid forced inlining on "cold" paths gasnetc_ofi_am_recv_poll(is_request); } GASNETI_NEVER_INLINE(gasnetc_fi_cq_readerr, // this wrapper silences a warning on gcc 4.8.5 ssize_t gasnetc_fi_cq_readerr(struct fid_cq *cq, struct fi_cq_err_entry *buf, uint64_t flags)) { return fi_cq_readerr(cq, buf, flags); } // ofi-conduit should not be considered "portable" when using // a high-performance provider (unless used w/ inappropriate h/w) int gasnetc_check_portable_conduit(void) { gasneti_assert(gasnetc_ofi_inited); if (! strcmp(gasnetc_ofi_provider, "verbs;ofi_rxm")) { // extension of bug 3609: some verbs-compatible networks need special handling // TODO: warn specifically about the right providers if (!strncmp(gasnetc_ofi_domain, "hfi1_", 5)) return 1; // psm2 if (!strncmp(gasnetc_ofi_domain, "qib", 3)) return 1; // psm } return !gasnetc_high_perf_prov; } // Reads any user-provided settings from the environment to avoid clogging up // the gasnetc_ofi_init() function with this code. // Runs after provier and domain selection to allow for provider-specific defaults static void gasnetc_ofi_read_env_vars(const char *provider, const char *domain) { const char* am_max_medium_env = "GASNET_OFI_MAX_MEDIUM"; const char* max_am_request_buffs_env = "GASNET_OFI_MAX_REQUEST_BUFFS"; const char* max_am_reply_buffs_env = "GASNET_OFI_MAX_REPLY_BUFFS"; const char* num_init_request_buffs_env = "GASNET_OFI_NUM_INITIAL_REQUEST_BUFFS"; const char* num_init_reply_buffs_env = "GASNET_OFI_NUM_INITIAL_REPLY_BUFFS"; const char* max_err_string = "%s must be greater than or equal to\n" "%s, which is set to %d in this run.\n"; const char* init_err_string = "%s must be greater than or equal to 2.\n"; // Maxiumum size of an AM Medium payload gasnetc_ofi_max_medium = gasneti_getenv_int_withdefault(am_max_medium_env, GASNETC_OFI_MAX_MEDIUM_DFLT, 1); if (gasnetc_ofi_max_medium < 512) { gasneti_fatalerror("%s setting (%lu) is below the minimum value of 512.", am_max_medium_env, (unsigned long)gasnetc_ofi_max_medium); } // Maximum and initial number of buffers to allocate for AM Requests max_am_request_buffs = gasneti_getenv_int_withdefault(max_am_request_buffs_env, 1024, 0); size_t dflt = MIN(256, max_am_request_buffs); num_init_am_request_buffs = gasneti_getenv_int_withdefault(num_init_request_buffs_env, dflt, 0); if (num_init_am_request_buffs < 2) { gasneti_fatalerror(init_err_string, num_init_request_buffs_env); } if (max_am_request_buffs < num_init_am_request_buffs) { gasneti_fatalerror(max_err_string, max_am_request_buffs_env, num_init_request_buffs_env, (int)num_init_am_request_buffs); } // Maximum and initial number of buffers to allocate for AM Replies max_am_reply_buffs = gasneti_getenv_int_withdefault(max_am_reply_buffs_env, 1024, 0); dflt = MIN(256, max_am_reply_buffs); num_init_am_reply_buffs = gasneti_getenv_int_withdefault(num_init_reply_buffs_env, dflt, 0); if (num_init_am_reply_buffs < 2) { gasneti_fatalerror(init_err_string, num_init_reply_buffs_env); } if (max_am_reply_buffs < num_init_am_reply_buffs) { gasneti_fatalerror(max_err_string, max_am_reply_buffs_env, num_init_reply_buffs_env, (int)num_init_am_reply_buffs); } /* The number of RMA requests to be issued before a tx_poll takes place */ rdma_periodic_poll_threshold = gasneti_getenv_int_withdefault("GASNET_OFI_RMA_POLL_FREQ", 32, 0); ofi_num_bbufs = gasneti_getenv_int_withdefault("GASNET_OFI_NUM_BBUFS", 64, 0); ofi_bbuf_size = gasneti_getenv_int_withdefault("GASNET_OFI_BBUF_SIZE", GASNET_PAGESIZE, 1); gasnetc_ofi_bbuf_threshold = gasneti_getenv_int_withdefault("GASNET_OFI_BBUF_THRESHOLD", 4*ofi_bbuf_size, 1); if (ofi_num_bbufs < gasnetc_ofi_bbuf_threshold/ofi_bbuf_size) gasneti_fatalerror("The number of bounce buffers must be greater than or equal to the bounce\n" "buffer threshold divided by the bounce buffer size. See the ofi-conduit README.\n"); if (gasnetc_ofi_bbuf_threshold/ofi_bbuf_size > OFI_MAX_NUM_BOUNCE_BUFFERS) { gasneti_fatalerror("The ofi-conduit limits the max number of bounce buffers used in the non-blocking\n" "put path to %d. Your selections for the bounce buffer tuning parameters exceed this. If you\n" "truly need more than %d bounce buffers, edit the OFI_MAX_NUM_BOUNCE_BUFFERS macro in\n" "gasnet_ofi.c and recompile.\n", OFI_MAX_NUM_BOUNCE_BUFFERS, OFI_MAX_NUM_BOUNCE_BUFFERS); } // What is the largest ofi-level message we can receive? min_multi_recv = MAX(offsetof(gasnetc_ofi_am_send_buf_t, buf.long_buf.data), offsetof(gasnetc_ofi_am_send_buf_t, buf.medium_buf.data)) + OFI_AM_MAX_DATA_LENGTH; min_multi_recv = GASNETI_ALIGNUP(min_multi_recv, GASNETI_MEDBUF_ALIGNMENT); const char* num_multirecv_buffs_env = "GASNET_OFI_NUM_RECEIVE_BUFFS"; const char* multirecv_size_env = "GASNET_OFI_RECEIVE_BUFF_SIZE"; char *env_val = gasneti_strdup(gasnet_getenv(multirecv_size_env)); for (size_t i = 0; i < strlen(env_val); ++i) env_val[i] = toupper(env_val[i]); if (! strcmp("SINGLE", env_val)) { const char *value = gasneti_dynsprintf("SINGLE => %d", (int)min_multi_recv); gasneti_envstr_display(multirecv_size_env, value, 0); multirecv_buff_size = min_multi_recv; // See Bug 4478 for information leading to the selection of 450 as a default. // TODO: at least consider scaling this with PPN num_multirecv_buffs = gasneti_getenv_int_withdefault(num_multirecv_buffs_env, 450, 0); } else { multirecv_buff_size = gasneti_getenv_int_withdefault(multirecv_size_env, 1024*1024, 1); num_multirecv_buffs = gasneti_getenv_int_withdefault(num_multirecv_buffs_env, 8, 0); } gasneti_free(env_val); if (num_multirecv_buffs < 2) gasneti_fatalerror("%s must be at least 2.\n", num_multirecv_buffs_env); if (multirecv_buff_size < min_multi_recv) { gasneti_fatalerror("%s must be at least %d bytes on this build.\n" "This is the size of the largest AM, including its message header.\n", \ multirecv_size_env, (int)min_multi_recv); } const char* long_rma_threshold_env = "GASNET_OFI_LONG_AM_RMA_THRESH"; long_rma_threshold = gasneti_getenv_int_withdefault(long_rma_threshold_env, OFI_AM_MAX_DATA_LENGTH, 1); if (long_rma_threshold > OFI_AM_MAX_DATA_LENGTH) { gasneti_fatalerror( "The value given for %s exceeds the amount\n" "of data which can be packed into a medium message (%d bytes on this build).\n" "Use a lower value or reconfigure GASNet for a bigger medium message size using\n" "--with-ofi-max-medium=.\n", long_rma_threshold_env, (int)OFI_AM_MAX_DATA_LENGTH); } tx_cq_size = gasneti_getenv_int_withdefault("GASNET_OFI_TX_CQ_SIZE", 0, 0); rx_cq_size = gasneti_getenv_int_withdefault("GASNET_OFI_RX_CQ_SIZE", 0, 0); } /* The intention of separating this logic from gasnetc_ofi_init() is * to contain the complexity of supporting scalable endpoints in the future to * this function and the relevant get-address macros. */ static void ofi_setup_address_vector() { int ret = FI_SUCCESS; conn_entry_t *mapped_table; struct fi_av_attr av_attr = {0}; /* Open Address Vector and bind the AV to the domain */ #if USE_AV_MAP av_attr.type = FI_AV_MAP; addr_table = (addr_table_t*)gasneti_malloc(gasneti_nodes * NUM_OFI_ENDPOINTS * sizeof(conn_entry_t) + sizeof(addr_table_t)); addr_table->size = gasneti_nodes * NUM_OFI_ENDPOINTS; mapped_table = addr_table->table; #else av_attr.type = FI_AV_TABLE; mapped_table = NULL; #endif ret = fi_av_open(gasnetc_ofi_domainfd, &av_attr, &gasnetc_ofi_avfd, NULL); GASNETC_OFI_CHECK_RET(ret, "fi_av_open failed"); /* Bind AV to endpoints, both RDMA/AM endpoints share the same AV object */ ret = fi_ep_bind(gasnetc_ofi_rdma_epfd, &gasnetc_ofi_avfd->fid, 0); GASNETC_OFI_CHECK_RET(ret, "fi_ep_bind for avfd to rdma_epfd failed"); ret = fi_ep_bind(gasnetc_ofi_request_epfd, &gasnetc_ofi_avfd->fid, 0); GASNETC_OFI_CHECK_RET(ret, "fi_ep_bind for avfd to am request epfd failed"); ret = fi_ep_bind(gasnetc_ofi_reply_epfd, &gasnetc_ofi_avfd->fid, 0); GASNETC_OFI_CHECK_RET(ret, "fi_ep_bind for avfd to am reply epfd failed"); } static void ofi_exchange_addresses() { size_t reqnamelen = 0, repnamelen = 0, rdmanamelen = 0; char* on_node_addresses; int ret = FI_SUCCESS; /* Query each endpoint for its address length. While in most cases, these * lengths will be equal, there are some cases where they might not be. For * example, when using both IPv4 and IPv6. */ ret = fi_getname(&gasnetc_ofi_request_epfd->fid, NULL, &reqnamelen); gasneti_assert(ret == -FI_ETOOSMALL); ret = fi_getname(&gasnetc_ofi_reply_epfd->fid, NULL, &repnamelen); gasneti_assert(ret == -FI_ETOOSMALL); ret = fi_getname(&gasnetc_ofi_rdma_epfd->fid, NULL, &rdmanamelen); gasneti_assert(ret == -FI_ETOOSMALL); size_t total_len = reqnamelen + repnamelen + rdmanamelen; on_node_addresses = gasneti_malloc(total_len); char* alladdrs = gasneti_malloc(gasneti_nodes*total_len); char* p = on_node_addresses; gasneti_static_assert(GASNETC_FADDR_IDX_REQ == 0); ret = fi_getname(&gasnetc_ofi_request_epfd->fid, p, &reqnamelen); GASNETC_OFI_CHECK_RET(ret, "fi_getname failed for the AM request endpoint"); p += reqnamelen; gasneti_static_assert(GASNETC_FADDR_IDX_REP == 1); ret = fi_getname(&gasnetc_ofi_reply_epfd->fid, p, &repnamelen); GASNETC_OFI_CHECK_RET(ret, "fi_getname failed for the AM reply endpoint"); p += repnamelen; gasneti_static_assert(GASNETC_FADDR_IDX_RDMA == 2); ret = fi_getname(&gasnetc_ofi_rdma_epfd->fid, p, &rdmanamelen); GASNETC_OFI_CHECK_RET(ret, "fi_getname failed for the RDMA endpoint"); p += rdmanamelen; gasneti_assert_ptr(p ,==, on_node_addresses + total_len); gasneti_bootstrapExchange(on_node_addresses, total_len, alladdrs); /* NOTE: If AV_MAP is ever to be supported, the NULL in the below call needs to be * changed to point to the AV structure. */ ret = fi_av_insert(gasnetc_ofi_avfd, alladdrs, gasneti_nodes*NUM_OFI_ENDPOINTS, NULL ,0ULL, NULL); if (gasneti_nodes*NUM_OFI_ENDPOINTS != ret) gasneti_fatalerror("fi_av_insert failed. Expected: %d Actual: %d\n", gasneti_nodes*NUM_OFI_ENDPOINTS, ret); gasneti_free(alladdrs); gasneti_free(on_node_addresses); } static struct fi_info *gasnetc_ofi_getinfo(struct fi_info *hints) { struct fi_info *info = NULL; int ret = fi_getinfo(OFI_CONDUIT_VERSION, NULL, NULL, 0ULL, hints, &info); if (FI_SUCCESS != ret) { return NULL; } // Find the first entry for the most-preferred provider offered, if any. const char *q = supported_providers; while (*q) { while (*q == ' ') ++q; const char *r = strchr(q, ' '); int len = r ? r - q : strlen(q); char prov_name[64]; strncpy(prov_name, q, len); prov_name[len] = '\0'; for (struct fi_info *p = info; p; p = p->next) { if (!strcmp(p->fabric_attr->prov_name, prov_name)) { return p; } } q += len; } return info; // caller will notice the wrong provider } // Utility function to set an environment variable with proper tracing/logging // Returns zero if the variable was already set, non-zero otherwise. int gasnetc_setenv_string(const char *key, const char *val, int replace) { char *prev; if (!replace && NULL != (prev = gasneti_getenv(key))) { gasneti_envstr_display(key, prev, 0); GASNETI_TRACE_PRINTF(I, ("Not overwriting %s in environment", key)); return 0; } else { GASNETI_TRACE_PRINTF(I, ("Setting %s='%s' in environment", key, val)); gasneti_envstr_display(key, val, 1); gasneti_setenv(key, val); return 1; } } // Wrapper for case of gasnetc_setenv_string for an unsigned int key int gasnetc_setenv_uint(const char *key, unsigned int val, int replace) { char valstr[16]; snprintf(valstr, sizeof(valstr), "%u", val); return gasnetc_setenv_string(key, valstr, replace); } // Helper for (large) page-aligned allocation #include // for MAP_FAILED static void *gasnetc_alloc_pages(size_t len, const char *desc) { void *result = gasneti_mmap(GASNETI_PAGE_ALIGNUP(len)); if (MAP_FAILED == result) { gasneti_fatalerror("Failed to allocate %"PRIuSZ " bytes %s", len, desc); } return result; } /*------------------------------------------------ * Initialize OFI conduit * ----------------------------------------------*/ int gasnetc_ofi_init(void) { int ret = GASNET_OK; int result = GASNET_ERR_NOT_INIT; struct fi_info *hints, *info; struct fi_cq_attr tx_cq_attr = {0}; struct fi_cq_attr rx_cq_attr = {0}; size_t optval; int num_locks; int i; // Ensure uniform FI_* env vars gasneti_propagate_env("FI_", GASNETI_PROPAGATE_ENV_PREFIX); // Ensure uniform MLX5_* env vars for verbs provider // Especially important for work-arounds like MLX5_SCATTER_TO_CQE gasneti_propagate_env("MLX5_", GASNETI_PROPAGATE_ENV_PREFIX); // TODO: other providers? #if GASNETC_OFI_USE_THREAD_DOMAIN && GASNET_PAR gasneti_spinlock_init(&gasnetc_ofi_locks.big_lock); #elif GASNET_PAR /* This lock is needed in PAR mode to protect the AM reference counting */ gasneti_spinlock_init(&gasnetc_ofi_locks.rx_request_cq); gasneti_spinlock_init(&gasnetc_ofi_locks.rx_reply_cq); #endif #if 0 gasneti_spinlock_init(&gasnetc_ofi_locks.tx_cq); gasneti_spinlock_init(&gasnetc_ofi_locks.am_rx); gasneti_spinlock_init(&gasnetc_ofi_locks.am_tx); gasneti_spinlock_init(&gasnetc_ofi_locks.rdma_rx); gasneti_spinlock_init(&gasnetc_ofi_locks.rdma_tx); #endif /* OFI initialization */ /* Alloc hints*/ hints = fi_allocinfo(); if (!hints) gasneti_fatalerror("fi_allocinfo for hints failed\n"); // constrain the device/domain if provided by the user gasnetc_ofi_device = gasneti_getenv_hwloc_withdefault("GASNET_OFI_DEVICE", "", "Socket"); if (!strlen(gasnetc_ofi_device)) gasnetc_ofi_device = NULL; hints->domain_attr->name = gasnetc_ofi_device; /* caps: fabric interface capabilities */ hints->caps = FI_RMA | FI_MSG | FI_MULTI_RECV; #if GASNET_HAVE_MK_CLASS_MULTIPLE hints->caps |= FI_HMEM; #endif /* mode: convey requirements for application to use fabric interfaces */ hints->mode = FI_CONTEXT; /* fi_context is used for per operation context parameter */ /* addr_format: expected address format for AV/CM calls */ hints->addr_format = FI_FORMAT_UNSPEC; hints->tx_attr->op_flags = FI_DELIVERY_COMPLETE; hints->ep_attr->type = FI_EP_RDM; /* Reliable datagram */ /* Threading mode is set by the configure script to FI_THREAD_DOMAIN if * using the psm2 provider and FI_THREAD_SAFE otherwise*/ #if GASNETC_OFI_USE_THREAD_DOMAIN || !GASNET_PAR hints->domain_attr->threading = FI_THREAD_DOMAIN; #else hints->domain_attr->threading = FI_THREAD_SAFE; #endif hints->domain_attr->control_progress = FI_PROGRESS_MANUAL; /* resource_mgmt: FI_RM_ENABLED - provider protects against overrunning local and remote resources. */ hints->domain_attr->resource_mgmt = FI_RM_ENABLED; /* av_type: type of address vectores that are usable with this domain */ hints->domain_attr->av_type = FI_AV_TABLE; /* type AV index */ #if OFI_CONDUIT_VERSION >= FI_VERSION(1, 5) // These are basically FI_MR_BASIC decomposed: hints->domain_attr->mr_mode = FI_MR_ALLOCATED | FI_MR_VIRT_ADDR | FI_MR_PROV_KEY | FI_MR_ENDPOINT; #else /* If the configure script detected a provider's mr_mode, then force * ofi to use that mode. */ switch(GASNETC_OFI_HAS_MR_SCALABLE) { case 0: hints->domain_attr->mr_mode = FI_MR_BASIC; break; case SCALABLE_NOT_AUTO_DETECTED: hints->domain_attr->mr_mode = FI_MR_UNSPEC; break; default: hints->domain_attr->mr_mode = FI_MR_SCALABLE; } #endif #if GASNET_HAVE_MK_CLASS_MULTIPLE hints->domain_attr->mr_mode |= FI_MR_HMEM; #endif // Setup various environment variables quite early, before the provider may // have been determined. This is necessary because fi_getinfo() may read them. // NOTE: spawn via an ofi-based MPI may have read these even earlier! #if 0 // Disabled pending bug 4413 // Provider-independent: gasnetc_setenv_uint("FI_UNIVERSE_SIZE", gasneti_nodes, 1); #endif // PSM2 provider: // In libfabric v1.6, the psm2 provider transitioned to using separate // psm2 endpoints for each ofi endpoint, whereas in the past all communication // was multiplexed over a single psm2 endpoint. Setting this variable ensures // that unnecessary connections between remote endpoints which never communicate // are not made, which can cause slow tear-down. int set_psm2_lazy_conn = 0; if (FI_VERSION(FI_MAJOR_VERSION, FI_MINOR_VERSION) >= FI_VERSION(1, 6)) { set_psm2_lazy_conn = gasnetc_setenv_string("FI_PSM2_LAZY_CONN", "1", 1); } // CXI provider: // To handle bursty AM traffic, enable hybrid receive mode with reasonable default parameters. // If FI_CXI_RX_MATCH_MODE is already set, we make NO changes (or risk an inconsistent mess). int set_cxi_match_mode = gasnetc_setenv_string("FI_CXI_RX_MATCH_MODE", "hybrid", 0); if (set_cxi_match_mode) { gasnetc_setenv_string("FI_CXI_RDZV_THRESHOLD", "256", 1); gasnetc_setenv_string("FI_CXI_RDZV_GET_MIN", "256", 1); } // else: warning deferred until provider selection confirms use of CXI info = gasnetc_ofi_getinfo(hints); if (!info) { #if GASNET_HAVE_MK_CLASS_MULTIPLE hints->caps ^= FI_HMEM; // retry w/o FI_HMEM info = gasnetc_ofi_getinfo(hints); if (info) { // fall through // gasnetc_fi_hmem will get set to zero later, leading to // deferred failure in the first call to gex_MK_Create(), if any } else #endif GASNETI_RETURN_ERRR(RESOURCE, "No OFI providers found that could support the OFI conduit"); } // Balk if provider was explicitly chosen at configure time and is not available now if (!strchr(supported_providers,' ') && strcmp(supported_providers, info->fabric_attr->prov_name)) { if (gasnetc_ofi_device) { // Retry to rule out invalid device choice hints->domain_attr->name = NULL; info = gasnetc_ofi_getinfo(hints); if (info && !strcmp(supported_providers, info->fabric_attr->prov_name)) { gasneti_fatalerror("Specifed device '%s' is not available or not usable", gasnetc_ofi_device); } } char *envvar = gasneti_getenv("FI_PROVIDER"); gasneti_fatalerror( "OFI provider '%s' selected at configure time is not available at run time%s%s%s.", supported_providers, envvar ? " and/or has been overridden by FI_PROVIDER='" : "", envvar ? envvar : "", envvar ? "' in the environment" : ""); } // Check if this provider is one we consider "high performance" const char *high_perf_providers[] = { "psm2", "cxi", "verbs;ofi_rxm" }; for (i = 0; i < sizeof(high_perf_providers)/sizeof(high_perf_providers[0]); ++i) { if (!strcmp(info->fabric_attr->prov_name, high_perf_providers[i])) { gasnetc_high_perf_prov = 1; break; } } // psm2 provider needs some special handling if (!strcmp(info->fabric_attr->prov_name, "psm2")){ using_psm_provider = 1; } else if (set_psm2_lazy_conn) { /* If we set this variable and are not using psm2, unset it in the * unlikely case that another library in the current application will * use ofi/psm2 */ unsetenv("FI_PSM2_LAZY_CONN"); } if (!strcmp(info->fabric_attr->prov_name, "cxi") && !set_cxi_match_mode) { gasneti_console0_message("WARNING", "ofi-conduit failed to configure FI_CXI_* " "environment variables due to prior conflicting settings. " "This may lead to unstable behavior and/or degraded " "performance. If you did not intentionally set " "FI_CXI_RX_MATCH_MODE, then this condition may have resulted " "from initializing MPI prior to initialization of GASNet. " "For more information on that scenario, please see \"Limits " "to MPI interoperability\" in the ofi-conduit README. "); } int quiet = gasneti_getenv_yesno_withdefault("GASNET_QUIET", 0); #if GASNET_PAR if (!using_psm_provider && GASNETC_OFI_USE_THREAD_DOMAIN) { const char * msg = "Using OFI provider \"%s\" when the ofi-conduit was configured for FI_THREAD_DOMAIN\n" "(possibly because the psm or psm2 provider was detected at configure time). In GASNET_PAR mode,\n" "this has the effect of using a global lock instead of fine-grained locking. If this causes \n" "undesirable performance in PAR, reconfigure GASNet using: --with-ofi-provider=%s --disable-thread-domain"; if (quiet) GASNETI_TRACE_PRINTF(I,(msg, info->fabric_attr->prov_name, info->fabric_attr->prov_name)); else gasneti_console0_message("WARNING", msg, info->fabric_attr->prov_name, info->fabric_attr->prov_name); } #endif if (!gasnetc_high_perf_prov) { const char * msg = "Using OFI provider (%s), which has not been validated to provide\n" " WARNING: acceptable GASNet performance. You should consider using a more\n" " WARNING: hardware-appropriate GASNet conduit. See ofi-conduit/README."; if (quiet) GASNETI_TRACE_PRINTF(I,(msg, info->fabric_attr->prov_name)); else gasneti_console0_message("WARNING", msg, info->fabric_attr->prov_name); } #if OFI_CONDUIT_VERSION >= FI_VERSION(1, 5) // When using 1.5 mr_mode logic, we *currently* expect the three mode bits to be // set or clear as a group, and conflate them as "BASIC" (set) vs "SCALABLE" (clear). // TODO: multi-segment support will render FI_MR_PROV_KEY irrelevant // TODO: FI_MR_ALLOCATED is only relevant to EVERYTHING support. has_mr_scalable = !(info->domain_attr->mr_mode & FI_MR_VIRT_ADDR); gasneti_assert_always_uint(has_mr_scalable ,==, !(info->domain_attr->mr_mode & FI_MR_ALLOCATED)); gasneti_assert_always_uint(has_mr_scalable ,==, !(info->domain_attr->mr_mode & FI_MR_PROV_KEY)); gasnetc_fi_mr_endpoint = (info->domain_attr->mr_mode & FI_MR_ENDPOINT); #else has_mr_scalable = (info->domain_attr->mr_mode == FI_MR_SCALABLE); #endif #if GASNETC_OFI_HAS_MR_SCALABLE_STATIC if (GASNETC_OFI_HAS_MR_SCALABLE != has_mr_scalable) { gasneti_fatalerror("The statically-determined value for GASNETC_OFI_HAS_MR_SCALABLE=%i does\n" " not match the memory registration support that the (%s) provider reported.\n" " This could happen if a provider that previously only supported FI_MR_BASIC\n" " added support for FI_MR_SCALABLE, or if the wrong provider was selected at runtime.\n" " Use configure option --%s-ofi-mr-scalable to correct this.", GASNETC_OFI_HAS_MR_SCALABLE, info->fabric_attr->prov_name, (has_mr_scalable ? "enable" : "disable")); } #endif #if GASNET_SEGMENT_EVERYTHING if (!GASNETC_OFI_HAS_MR_SCALABLE) { gasneti_fatalerror("GASNET_SEGMENT_EVERYTHING is not supported when using FI_MR_BASIC.\n" "Pick an OFI provider that supports FI_MR_SCALABLE if EVERYTHING\n" "is needed.\n"); } #endif #if GASNET_HAVE_MK_CLASS_MULTIPLE gasnetc_fi_hmem = !!(info->caps & FI_HMEM); #endif /* Open the fabric provider */ ret = fi_fabric(info->fabric_attr, &gasnetc_ofi_fabricfd, NULL); GASNETC_OFI_CHECK_RET(ret, "fi_fabric failed"); GASNETI_TRACE_PRINTF(I, ("Opened provider '%s' version %u.%u", info->fabric_attr->prov_name, (unsigned int)FI_MAJOR(info->fabric_attr->prov_version), (unsigned int)FI_MINOR(info->fabric_attr->prov_version))); gasneti_leak( gasnetc_ofi_provider = gasneti_strdup(info->fabric_attr->prov_name) ); /* Open a fabric access domain, also referred to as a resource domain */ ret = fi_domain(gasnetc_ofi_fabricfd, info, &gasnetc_ofi_domainfd, NULL); GASNETC_OFI_CHECK_RET(ret, "fi_domain failed"); GASNETI_TRACE_PRINTF(I, ("Opened domain '%s'", info->domain_attr->name)); gasneti_leak( gasnetc_ofi_domain = gasneti_strdup(info->domain_attr->name) ); // Now read user-provided environment settings gasnetc_ofi_read_env_vars(info->fabric_attr->prov_name, info->domain_attr->name); /* The intention here is to ensure that subsequent calls to fi_getinfo() * won't ever give us a different provider. * This is necessary when more than one provider matches the other hints, * and the first match is not the one we want. */ hints->fabric_attr->prov_name = gasnetc_ofi_provider; hints->domain_attr->name = gasnetc_ofi_domain; fi_freeinfo(info); /* Allocate a new active endpoint for RDMA operations */ hints->caps = FI_RMA; #if GASNET_HAVE_MK_CLASS_MULTIPLE if (gasnetc_fi_hmem) { hints->caps |= FI_HMEM; } #endif hints->mode = 0; // in particular we do not support FI_CONTEXT due to many-to-one iop ret = fi_getinfo(OFI_CONDUIT_VERSION, NULL, NULL, 0ULL, hints, &gasnetc_rma_info); GASNETC_OFI_CHECK_RET(ret, "fi_getinfo() failed querying for RMA endpoint"); ret = fi_endpoint(gasnetc_ofi_domainfd, gasnetc_rma_info, &gasnetc_ofi_rdma_epfd, NULL); GASNETC_OFI_CHECK_RET(ret, "fi_endpoint for rdma failed"); GASNETI_TRACE_PRINTF(I,("RMA EP: max_msg_size=%"PRIuSZ" inject_size=%"PRIuSZ" rma_iov_limit=%"PRIuSZ, gasnetc_rma_info->ep_attr->max_msg_size, gasnetc_rma_info->tx_attr->inject_size, gasnetc_rma_info->tx_attr->rma_iov_limit)); // Maximum size to use for RMA with FI_INJECT { const char *env_var = "GASNET_OFI_RMA_INJECT_LIMIT"; uint64_t dflt = gasnetc_rma_info->tx_attr->inject_size; uint64_t value = gasneti_getenv_int_withdefault(env_var, dflt, 1); if (value > dflt) { // enforce dflt as the maximum if (!gasneti_mynode) { gasneti_console_message("WARNING", "%s reduced from the requested value %"PRIu64 " to the maximum supported value %"PRIu64, env_var, value, dflt); } value = dflt; } max_buffered_write = value; } GASNETI_TRACE_PRINTF(I, ("Max buffered write size is %"PRIu64, max_buffered_write)); /* Allocate a new active endpoint for AM operations buffer */ hints->caps = FI_MSG | FI_MULTI_RECV; hints->mode = FI_CONTEXT; ret = fi_getinfo(OFI_CONDUIT_VERSION, NULL, NULL, 0ULL, hints, &gasnetc_msg_info); GASNETC_OFI_CHECK_RET(ret, "fi_getinfo() failed querying for MSG endpoints"); ret = fi_endpoint(gasnetc_ofi_domainfd, gasnetc_msg_info, &gasnetc_ofi_request_epfd, NULL); GASNETC_OFI_CHECK_RET(ret, "fi_endpoint for am request endpoint failed"); ret = fi_endpoint(gasnetc_ofi_domainfd, gasnetc_msg_info, &gasnetc_ofi_reply_epfd, NULL); GASNETC_OFI_CHECK_RET(ret, "fi_endpoint for am reply endpoint failed"); GASNETI_TRACE_PRINTF(I,("MSG EP: max_msg_size=%"PRIuSZ" inject_size=%"PRIuSZ" iov_limit=%"PRIuSZ, gasnetc_msg_info->ep_attr->max_msg_size, gasnetc_msg_info->tx_attr->inject_size, gasnetc_msg_info->tx_attr->iov_limit)); // Maximum size to use for fi_inject { const char *new_env_var = "GASNET_OFI_AM_INJECT_LIMIT"; const char *old_env_var = "GASNET_OFI_INJECT_LIMIT"; // Perfer the new variable name unless only the legacy one is set const char *env_var = (gasneti_getenv(new_env_var) || !gasneti_getenv(old_env_var)) ? new_env_var : old_env_var; uint64_t dflt = gasnetc_msg_info->tx_attr->inject_size; uint64_t value = gasneti_getenv_int_withdefault(env_var, dflt, 1); if (value > dflt) { // enforce dflt as the maximum if (!gasneti_mynode) { gasneti_console_message("WARNING", "%s reduced from the requested value %"PRIu64 " to the maximum supported value %"PRIu64, env_var, value, dflt); } value = dflt; } max_buffered_send = value; } GASNETI_TRACE_PRINTF(I, ("Max buffered send size is %"PRIu64, max_buffered_send)); // Allocate a CQ that will ideally be shared for both RDMA and AM tx ops memset(&tx_cq_attr, 0, sizeof(tx_cq_attr)); tx_cq_attr.format = FI_CQ_FORMAT_DATA; /* Provides data associated with a completion */ tx_cq_attr.size = tx_cq_size; tx_cq_attr.wait_obj = FI_WAIT_NONE; ret = fi_cq_open(gasnetc_ofi_domainfd, &tx_cq_attr, &gasnetc_ofi_tx_cqfd, NULL); GASNETC_OFI_CHECK_RET(ret, "fi_cq_open for tx_cqfd failed"); /* Allocate recv completion queues for AMs */ memset(&rx_cq_attr, 0, sizeof(rx_cq_attr)); rx_cq_attr.format = FI_CQ_FORMAT_DATA; rx_cq_attr.size = rx_cq_size; rx_cq_attr.wait_obj = FI_WAIT_NONE; ret = fi_cq_open(gasnetc_ofi_domainfd, &rx_cq_attr, &gasnetc_ofi_request_cqfd, NULL); GASNETC_OFI_CHECK_RET(ret, "fi_cq_open for am request cq failed"); ret = fi_cq_open(gasnetc_ofi_domainfd, &rx_cq_attr, &gasnetc_ofi_reply_cqfd, NULL); GASNETC_OFI_CHECK_RET(ret, "fi_cq_open for am reply cq failed"); /* Bind CQs to endpoints */ ret = fi_ep_bind(gasnetc_ofi_rdma_epfd, &gasnetc_ofi_tx_cqfd->fid, FI_TRANSMIT | FI_RECV); GASNETC_OFI_CHECK_RET(ret, "fi_ep_bind for tx_cq to rdma_epfd failed"); ret = fi_ep_bind(gasnetc_ofi_request_epfd, &gasnetc_ofi_tx_cqfd->fid, FI_TRANSMIT); #if GASNETC_OFI_USE_MULTI_CQ if (ret == -FI_EINVAL) { // Provider doesn't want to let us share CQ GASNETI_TRACE_PRINTF(I, ("Allocating distinct reqtx_cqfd")); ret = fi_cq_open(gasnetc_ofi_domainfd, &tx_cq_attr, &gasnetc_ofi_reqtx_cqfd, NULL); GASNETC_OFI_CHECK_RET(ret, "fi_cq_open for reqtx_cqfd failed"); ret = fi_ep_bind(gasnetc_ofi_request_epfd, &gasnetc_ofi_reqtx_cqfd->fid, FI_TRANSMIT); } #endif GASNETC_OFI_CHECK_RET(ret, "fi_ep_bind for tx_cq to am request CQ failed"); ret = fi_ep_bind(gasnetc_ofi_reply_epfd, &gasnetc_ofi_tx_cqfd->fid, FI_TRANSMIT); #if GASNETC_OFI_USE_MULTI_CQ if (ret == -FI_EINVAL) { // Provider doesn't want to let us share CQ GASNETI_TRACE_PRINTF(I, ("Allocating distinct reptx_cqfd")); ret = fi_cq_open(gasnetc_ofi_domainfd, &tx_cq_attr, &gasnetc_ofi_reptx_cqfd, NULL); GASNETC_OFI_CHECK_RET(ret, "fi_cq_open for reptx_cqfd failed"); ret = fi_ep_bind(gasnetc_ofi_reply_epfd, &gasnetc_ofi_reptx_cqfd->fid, FI_TRANSMIT); } #endif GASNETC_OFI_CHECK_RET(ret, "fi_ep_bind for tx_cq to am reply CQ failed"); ret = fi_ep_bind(gasnetc_ofi_request_epfd, &gasnetc_ofi_request_cqfd->fid, FI_RECV); GASNETC_OFI_CHECK_RET(ret, "fi_ep_bind for am request cq to am_request_epfd failed"); ret = fi_ep_bind(gasnetc_ofi_reply_epfd, &gasnetc_ofi_reply_cqfd->fid, FI_RECV); GASNETC_OFI_CHECK_RET(ret, "fi_ep_bind for am reply cq to am_reply_epfd failed"); /* Low-water mark for shared receive buffer */ GASNETI_TRACE_PRINTF(I, ("Setting multi-recv low-water mark to %"PRIuSZ, min_multi_recv)); optval = min_multi_recv; ret = fi_setopt(&gasnetc_ofi_request_epfd->fid, FI_OPT_ENDPOINT, FI_OPT_MIN_MULTI_RECV, &optval, sizeof(optval)); GASNETC_OFI_CHECK_RET(ret, "fi_setopt for am request epfd failed"); gasneti_assert_uint(optval ,==, min_multi_recv); // documented as IN ret = fi_setopt(&gasnetc_ofi_reply_epfd->fid, FI_OPT_ENDPOINT, FI_OPT_MIN_MULTI_RECV, &optval, sizeof(optval)); GASNETC_OFI_CHECK_RET(ret, "fi_setopt for am reply epfd failed"); gasneti_assert_uint(optval ,==, min_multi_recv); // documented as IN ofi_setup_address_vector(); /* Enable endpoints */ ret = fi_enable(gasnetc_ofi_rdma_epfd); GASNETC_OFI_CHECK_RET(ret, "fi_enable for rdma failed"); ret = fi_enable(gasnetc_ofi_request_epfd); GASNETC_OFI_CHECK_RET(ret, "fi_enable for am request ep failed"); ret = fi_enable(gasnetc_ofi_reply_epfd); GASNETC_OFI_CHECK_RET(ret, "fi_enable for am reply ep failed"); ofi_exchange_addresses(); // Don't allow libfabric to free() our strings hints->domain_attr->name = NULL; hints->fabric_attr->prov_name = NULL; fi_freeinfo(hints); if (GASNETC_OFI_HAS_MR_PROV_KEY) { // 'static' ensures valgrind does not consider this allocation "possibly lost" static uint64_t **tmp; tmp = gasneti_calloc(1+GASNET_MAXEPS, sizeof(uint64_t*)); gasnetc_remote_key_tbl = tmp + 1; // places aux seg keys at gasnetc_remote_key_tbl[-1] } receive_region_size = multirecv_buff_size*num_multirecv_buffs; receive_region_start = gasnetc_alloc_pages(receive_region_size, "for multi-recv buffers"); metadata_array = gasneti_malloc(sizeof(gasnetc_ofi_recv_metadata_t)*num_multirecv_buffs); { char valstr[16]; gasneti_format_number(multirecv_buff_size*num_multirecv_buffs, valstr, sizeof(valstr), 1); GASNETI_TRACE_PRINTF(I, ("Allocated %s for %"PRIuSZ " multi-recv buffers", valstr, num_multirecv_buffs)); } for(i = 0; i < num_multirecv_buffs; i++) { gasnetc_ofi_recv_metadata_t* metadata = metadata_array + i; metadata->iov.iov_base = ((char*)receive_region_start) + multirecv_buff_size*i; metadata->iov.iov_len = multirecv_buff_size; metadata->am_buff_msg.msg_iov = &metadata->iov; metadata->am_buff_msg.iov_count = 1; metadata->am_buff_msg.addr = FI_ADDR_UNSPEC; metadata->am_buff_msg.desc = NULL; metadata->am_buff_msg.context = gasnetc_recv_ctxt_to_op_ctxt(&metadata->am_buff_ctxt); metadata->am_buff_msg.data = 0; metadata->am_buff_ctxt.final_cntr = 0; metadata->am_buff_ctxt.event_cntr = 0; gasnetc_paratomic_set(&metadata->am_buff_ctxt.consumed_cntr, 0, 0); // Post multi-recv buffers for Active Messages struct fid_ep *epfd = (i % 2 == 0) ? gasnetc_ofi_request_epfd : gasnetc_ofi_reply_epfd; ret = fi_recvmsg(epfd, &metadata->am_buff_msg, FI_MULTI_RECV); GASNETC_OFI_CHECK_RET(ret, "fi_recvmsg failed"); } /* Allocate bounce buffers*/ bounce_region_size = GASNETI_PAGE_ALIGNUP(ofi_num_bbufs * ofi_bbuf_size); bounce_region_start = gasnetc_alloc_pages(bounce_region_size, "for bounce buffers"); { char valstr[16]; gasneti_format_number(bounce_region_size, valstr, sizeof(valstr), 1); GASNETI_TRACE_PRINTF(I, ("Allocated %s for %"PRIuSZ " bounce buffers", valstr, ofi_num_bbufs)); } /* Progress backwards so that when these buffers are added to the stack, they * will come off of it in order by address */ char* buf = (char*)bounce_region_start + (ofi_num_bbufs-1)*ofi_bbuf_size; for (i = 0; i < (int)ofi_num_bbufs; i++) { gasneti_assert(buf); gasnetc_ofi_bounce_buf_t* container = gasneti_malloc(sizeof(gasnetc_ofi_bounce_buf_t)); container->buf = buf; gasneti_lifo_push(&ofi_bbuf_pool, container); buf -= ofi_bbuf_size; } // Accounting to prevent dynamic over-allocation gasneti_semaphore_init(&num_unallocated_request_buffers, max_am_request_buffs - num_init_am_request_buffs, 0); gasneti_semaphore_init(&num_unallocated_reply_buffers, max_am_reply_buffs - num_init_am_reply_buffs, 0); size_t total_init = num_init_am_request_buffs + num_init_am_reply_buffs; am_buffers_region_size = GASNETI_PAGE_ALIGNUP(total_init*GASNETC_SIZEOF_AM_BUF_T); am_buffers_region_start = gasnetc_alloc_pages(am_buffers_region_size, "for AM send buffers"); { char valstr[16]; gasneti_format_number(am_buffers_region_size, valstr, sizeof(valstr), 1); GASNETI_TRACE_PRINTF(I, ("Allocated %s for %"PRIuSZ " (out of max %"PRIuSZ ") AM send buffers", valstr, total_init, max_am_request_buffs + max_am_reply_buffs)); } /* Add the buffers to the stack in reverse order to be friendly to the cache. */ gasnetc_ofi_send_ctxt_t * bufp = (gasnetc_ofi_send_ctxt_t*) ((uintptr_t)am_buffers_region_start + GASNETC_SIZEOF_AM_BUF_T*(total_init - 1)); GASNETC_STAT_EVENT_VAL(ALLOC_REQ_BUFF, num_init_am_request_buffs); for (i = 0; i < (int)num_init_am_request_buffs; i++) { bufp->pool = &ofi_am_request_pool; gasneti_lifo_push(bufp->pool, bufp); bufp = (gasnetc_ofi_send_ctxt_t*)((uintptr_t)bufp - GASNETC_SIZEOF_AM_BUF_T); } GASNETC_STAT_EVENT_VAL(ALLOC_REP_BUFF, num_init_am_reply_buffs); for (i = 0; i < (int)num_init_am_reply_buffs; i++) { bufp->pool = &ofi_am_reply_pool; gasneti_lifo_push(bufp->pool, bufp); bufp = (gasnetc_ofi_send_ctxt_t*)((uintptr_t)bufp - GASNETC_SIZEOF_AM_BUF_T); } gasnetc_ofi_inited = 1; return GASNET_OK; } /*------------------------------------------------ * OFI conduit exit function * ----------------------------------------------*/ void gasnetc_ofi_exit(void) { if (!gasnetc_ofi_inited) return; #if GASNET_PAR && GASNETC_OFI_USE_THREAD_DOMAIN // Attempt to obtain (and *never* release) the big_lock in bounded time const uint64_t timeout_ns = 10 * 1000000000L; // TODO: arbitrary 10s const gasneti_tick_t t_start = gasneti_ticks_now(); GASNETI_SPIN_WHILE(EBUSY == GASNETC_OFI_TRYLOCK(&gasnetc_ofi_locks.big_lock), { if (timeout_ns < gasneti_ticks_to_ns(gasneti_ticks_now() - t_start)) return; }); #endif // (attempt to) cancel multi-recv operations for Active Messages for (int i = 0; i < num_multirecv_buffs; i++) { gasnetc_ofi_recv_ctxt_t *am_buff_ctxt = &metadata_array[i].am_buff_ctxt; struct fid_ep *epfd = (i % 2 == 0) ? gasnetc_ofi_request_epfd : gasnetc_ofi_reply_epfd; (void) fi_cancel(&epfd->fid, gasnetc_recv_ctxt_to_op_ctxt(am_buff_ctxt)); } #if GASNETI_CLIENT_THREADS /* Unsafe to free resources if other threads may be using them */ #else gasneti_free(metadata_array); gasneti_munmap(receive_region_start, receive_region_size); // TODO: when/if the following are proven safe //gasneti_munmap(bounce_region_start, bounce_region_size); //gasneti_munmap(am_buffers_region_start, am_buffers_region_size); #endif if(fi_close(&gasnetc_ofi_reply_epfd->fid)!=FI_SUCCESS) { gasneti_fatalerror("close am reply epfd failed\n"); } if(fi_close(&gasnetc_ofi_request_epfd->fid)!=FI_SUCCESS) { gasneti_fatalerror("close am request epfd failed\n"); } #if GASNET_SEGMENT_FAST || GASNET_SEGMENT_LARGE { // TODO: loop over clients gasneti_Client_t i_client = gasneti_import_client(gasneti_THUNK_CLIENT); for (gex_EP_Index_t ep_idx = 0; ep_idx < GASNET_MAXEPS; ++ep_idx) { gasneti_EP_t i_ep = i_client->_ep_tbl[ep_idx]; if (i_ep) gasnetc_ep_unbindsegment(i_ep); } } #else if(gasnetc_segment_mrfd && (fi_close(&gasnetc_segment_mrfd->fid)!=FI_SUCCESS)) { gasneti_fatalerror("close mrfd failed\n"); } #endif if (gasnetc_auxseg_mrfd && (fi_close(&gasnetc_auxseg_mrfd->fid) != FI_SUCCESS)) { gasneti_fatalerror("close auxseg mrfd failed\n"); } // This must follow closing MRs if bound due to FI_MR_ENDPOINT if(fi_close(&gasnetc_ofi_rdma_epfd->fid)!=FI_SUCCESS) { gasneti_fatalerror("close rdma epfd failed\n"); } if(fi_close(&gasnetc_ofi_tx_cqfd->fid)!=FI_SUCCESS) { gasneti_fatalerror("close am tx_cqfd failed\n"); } #if GASNETC_OFI_USE_MULTI_CQ if(gasnetc_ofi_reqtx_cqfd && fi_close(&gasnetc_ofi_reqtx_cqfd->fid)!=FI_SUCCESS) { gasneti_fatalerror("close am reqtx_cqfd failed\n"); } if(gasnetc_ofi_reptx_cqfd && fi_close(&gasnetc_ofi_reptx_cqfd->fid)!=FI_SUCCESS) { gasneti_fatalerror("close am reptx_cqfd failed\n"); } #endif if(fi_close(&gasnetc_ofi_reply_cqfd->fid)!=FI_SUCCESS) { gasneti_fatalerror("close am reply cqfd failed\n"); } if(fi_close(&gasnetc_ofi_request_cqfd->fid)!=FI_SUCCESS) { gasneti_fatalerror("close am request cqfd failed\n"); } if(fi_close(&gasnetc_ofi_avfd->fid)!=FI_SUCCESS) { gasneti_fatalerror("close av failed\n"); } if(fi_close(&gasnetc_ofi_domainfd->fid)!=FI_SUCCESS) { gasneti_fatalerror("close domainfd failed\n"); } if(fi_close(&gasnetc_ofi_fabricfd->fid)!=FI_SUCCESS) { gasneti_fatalerror("close fabricfd failed\n"); } if (GASNETC_OFI_HAS_MR_PROV_KEY) { // table indices run [-1, GASNET_MAXEPS) for (int epidx = -1; epidx < GASNET_MAXEPS; ++epidx) { gasneti_free(gasnetc_remote_key_tbl[epidx]); } gasneti_free(gasnetc_remote_key_tbl - 1); } #if USE_AV_MAP gasneti_free(addr_table); #endif } /*------------------------------------------------ * OFI conduit callback functions * ----------------------------------------------*/ /* Handle Active Messages */ GASNETI_INLINE(gasnetc_ofi_handle_am) void gasnetc_ofi_handle_am(gasnetc_ofi_am_send_buf_t *header, int isreq, size_t msg_len, uint64_t cq_data) { uint8_t *addr; int handler = header->handler; const gex_AM_Entry_t * const handler_entry = &gasnetc_handler[handler]; gex_AM_Fn_t handler_fn = handler_entry->gex_fnptr; gex_AM_Arg_t *args; int numargs = header->argnum; int data_offset; gex_Token_t token = (gex_Token_t)header; size_t nbytes; gasneti_assert_uint(isreq ,==, header->isreq); switch(header->type) { case OFI_AM_SHORT: args = (gex_AM_Arg_t *)header->buf.short_buf.data; GASNETI_RUN_HANDLER_SHORT(isreq, handler, handler_fn, token, args, numargs); break; case OFI_AM_MEDIUM: data_offset = GASNETI_ALIGNUP(sizeof(gex_AM_Arg_t)*numargs, GASNETI_MEDBUF_ALIGNMENT); args = (gex_AM_Arg_t *)header->buf.medium_buf.data; addr = header->buf.medium_buf.data + data_offset; nbytes = msg_len - header->overhead; GASNETI_RUN_HANDLER_MEDIUM(isreq, handler, handler_fn, token, args, numargs, addr, nbytes); break; case OFI_AM_LONG: data_offset = sizeof(gex_AM_Arg_t)*numargs; args = (gex_AM_Arg_t *)header->buf.long_buf.data; addr = header->buf.long_buf.dest_ptr; nbytes = cq_data; GASNETI_RUN_HANDLER_LONG(isreq, handler, handler_fn, token, args, numargs, addr, nbytes); break; case OFI_AM_LONG_MEDIUM: data_offset = sizeof(gex_AM_Arg_t)*numargs; args = (gex_AM_Arg_t *)header->buf.long_buf.data; addr = header->buf.long_buf.dest_ptr; nbytes = cq_data; memcpy(addr, header->buf.long_buf.data + data_offset, nbytes); GASNETI_RUN_HANDLER_LONG(isreq, handler, handler_fn, token, args, numargs, addr, nbytes); break; default: gasneti_unreachable_error(("undefined header type in gasnetc_ofi_handle_am: %d", header->type)); } } // Handle completion of a simple blocking operation void gasnetc_ofi_handle_blocking(void *op_context, unsigned int aux) { gasnetc_ofi_blocking_op_ctxt_t *ptr = gasneti_container_of(op_context, gasnetc_ofi_blocking_op_ctxt_t, callback); ptr->complete = 1; } // Handle RDMA completion as the initiator // TODO: refine if/when more bits of "aux" are defined void gasnetc_ofi_handle_rdma(void *op_context, unsigned int aux) { gasnetc_ofi_nb_op_ctxt_t *ptr = gasneti_container_of(op_context, gasnetc_ofi_nb_op_ctxt_t, callback); switch (ptr->type) { case OFI_TYPE_EGET: { gasnete_eop_t *eop = gasneti_container_of(ptr, gasnete_eop_t, ofi); gasnete_eop_check(eop); GASNETE_EOP_MARKDONE(eop); } break; case OFI_TYPE_EPUT: { gasnete_eop_t *eop = gasneti_container_of(ptr, gasnete_eop_t, ofi); gasnete_eop_check(eop); if (aux) GASNETE_EOP_LC_FINISH(eop); GASNETE_EOP_MARKDONE(eop); } break; case OFI_TYPE_IGET: { gasnete_iop_t *iop = gasneti_container_of(ptr, gasnete_iop_t, get_ofi); gasnete_iop_check(iop); GASNETE_IOP_CNT_FINISH(iop, get, 1, GASNETI_ATOMIC_REL); } break; case OFI_TYPE_IPUT: { gasnete_iop_t *iop = gasneti_container_of(ptr, gasnete_iop_t, put_ofi); gasnete_iop_check(iop); if (aux) GASNETE_IOP_LC_FINISH(iop); GASNETE_IOP_CNT_FINISH(iop, put, 1, GASNETI_ATOMIC_NONE); } break; default: gasneti_unreachable_error(("undefined OP type in gasnetc_ofi_rdma_poll: %d", ptr->type)); } } // Allocate an AM send buffer, spin-polling if necessary/allowed // TODO: should Reply be permitted to borrow from Request pool? GASNETI_INLINE(gasnetc_ofi_get_am_header) gasnetc_ofi_send_ctxt_t *gasnetc_ofi_get_am_header(int isreq, gex_Flags_t flags GASNETI_THREAD_FARG) { const gex_Flags_t imm = flags & GEX_FLAG_IMMEDIATE; gasneti_lifo_head_t* pool = isreq ? &ofi_am_request_pool : &ofi_am_reply_pool; gasnetc_ofi_send_ctxt_t *header = gasneti_lifo_pop(pool); #if GASNETC_IMMEDIATE_AMPOLLS if (header) return header; #else if (header || imm) return header; #endif // Poll only the tx queue and retry the pool before (maybe) allocating another buffer gasnetc_ofi_tx_poll(); header = gasneti_lifo_pop(pool); if (header) return header; // Allocate another unless doing so would exceed the max gasneti_semaphore_t* sema = isreq ? &num_unallocated_request_buffers : &num_unallocated_reply_buffers; if (gasneti_semaphore_trydown(sema)) { // TODO: cache-align and allocate more than one at a time header = gasneti_malloc(GASNETC_SIZEOF_AM_BUF_T); gasneti_leak(header); header->pool = pool; if (isreq) { GASNETC_STAT_EVENT_VAL(ALLOC_REQ_BUFF, 1); } else { GASNETC_STAT_EVENT_VAL(ALLOC_REP_BUFF, 1); } return header; } #if GASNETC_IMMEDIATE_AMPOLLS if (imm) return NULL; #else gasneti_assert(!imm); #endif // Spin-poll until a buffer is free // These are "DOUNTIL" since already know buffer pool is empty if (isreq) { GASNETI_SPIN_DOUNTIL((header = gasneti_lifo_pop(pool)), GASNETC_OFI_POLL_SELECTIVE(OFI_POLL_ALL)); } else { GASNETI_SPIN_DOUNTIL((header = gasneti_lifo_pop(pool)), GASNETC_OFI_POLL_SELECTIVE(OFI_POLL_REPLY)); } return header; } // Release unused AM send buffer #define gasnetc_ofi_free_am_header(header) \ gasneti_lifo_push(header->pool, header) // Process completed AM send // TODO: Async LC handling goes here void gasnetc_ofi_am_send_complete(gasnetc_ofi_send_ctxt_t *header) { gasnetc_ofi_free_am_header(header); } void gasnetc_ofi_handle_bounce_rdma(void *op_context, unsigned int aux) { gasnetc_ofi_bounce_op_ctxt_t *op = gasneti_container_of(op_context, gasnetc_ofi_bounce_op_ctxt_t, callback); if (gasnetc_paratomic_decrement_and_test(&op->cntr, 0)) { gasnetc_ofi_bounce_buf_t * bbuf_to_return; while (NULL != (bbuf_to_return = gasneti_lifo_pop(&op->bbuf_list))) gasneti_lifo_push(&ofi_bbuf_pool, bbuf_to_return); /* These completions will always be RDMA, so call the callback directly */ gasnetc_ofi_handle_rdma(gasnetc_rdma_ctxt_to_op_ctxt(op->orig_op,0),0); gasneti_lifo_push(&ofi_bbuf_ctxt_pool, op); } } GASNETI_INLINE(gasnetc_ofi_get_bounce_ctxt) gasnetc_ofi_bounce_op_ctxt_t* gasnetc_ofi_get_bounce_ctxt(void) { gasnetc_ofi_bounce_op_ctxt_t* ctxt = gasneti_lifo_pop(&ofi_bbuf_ctxt_pool); if (NULL == ctxt) { ctxt = gasneti_calloc(1,sizeof(gasnetc_ofi_bounce_op_ctxt_t)); ctxt->callback = gasnetc_ofi_handle_bounce_rdma; gasneti_lifo_init(&ctxt->bbuf_list); gasneti_leak(ctxt); } return ctxt; } /*------------------------------------------------ * Pre-post or pin-down memory * ----------------------------------------------*/ // Local registration of segment memory int gasnetc_ep_bindsegment(gasneti_EP_t i_ep, gasneti_Segment_t segment) { GASNETI_TRACE_PRINTF(C,("Binding segment [%p, %p) to EP %d", segment->_addr, segment->_ub, i_ep->_index)); void *segbase; uintptr_t segsize; struct fid_mr** mrfd_p; gasnetc_EP_t c_ep = (gasnetc_EP_t)i_ep; #if GASNET_SEGMENT_FAST || GASNET_SEGMENT_LARGE gasneti_assert(segment); segbase = segment->_addr; segsize = segment->_size; gasneti_assert(c_ep); mrfd_p = &c_ep->mrfd; #else if (!segment) { segbase = (void *)0; segsize = UINT64_MAX; mrfd_p = &gasnetc_segment_mrfd; } else if (gasneti_i_segment_kind_is_host((gasneti_Segment_t) segment)) { // No additional host memory registration required return GASNET_OK; } else { gasneti_unreachable_error(("ofi-conduit does not yet support non-host memory kinds in GASNET_SEGMENT_EVERYTHING mode")); } #endif // TBD: do we want local READ or WRITE in access? will atomics need them, for instance? struct iovec iov = { segbase, segsize }; uint64_t access = FI_REMOTE_READ | FI_REMOTE_WRITE; uint64_t key = GASNETC_EPIDX_TO_KEY(c_ep->_index); uint64_t flags = 0; struct fi_mr_attr attr = {0}; attr.mr_iov = &iov; attr.iov_count = 1; attr.access = access; attr.requested_key = key; attr.context = NULL; #if GASNET_HAVE_MK_CLASS_MULTIPLE gex_MK_Class_t mk_class; if (segment->_kind == GEX_MK_HOST) { gasneti_static_assert(FI_HMEM_SYSTEM == 0); mk_class = GEX_MK_CLASS_HOST; c_ep->device_only_segment = 0; } else { gasneti_MK_t i_mk = gasneti_import_mk(segment->_kind); mk_class = i_mk->_mk_class; switch (mk_class) { #if GASNET_HAVE_MK_CLASS_CUDA_UVA case GEX_MK_CLASS_CUDA_UVA: attr.iface = FI_HMEM_CUDA; attr.device.cuda = (int)(uintptr_t)i_mk->_mk_conduit; c_ep->device_only_segment = 1; #ifdef FI_HMEM_DEVICE_ONLY flags |= FI_HMEM_DEVICE_ONLY; #endif break; #endif #if GASNET_HAVE_MK_CLASS_HIP case GEX_MK_CLASS_HIP: attr.iface = FI_HMEM_ROCR; c_ep->device_only_segment = 0; break; #endif default: gasneti_unreachable_error(("undefined or unsupported gex_MK_Class_t value: %d", mk_class)); break; } } #endif #if GASNETC_HAVE_FI_MR_REG_ATTR const char *reg_fn = "fi_mr_regattr"; int ret = fi_mr_regattr(gasnetc_ofi_domainfd, &attr, flags, mrfd_p); #else const char *reg_fn = "fi_mr_reg"; int ret = fi_mr_reg(gasnetc_ofi_domainfd, attr.mr_iov->iov_base, attr.mr_iov->iov_len, attr.access, attr.offset, attr.requested_key, flags, mrfd_p, attr.context); #endif if (ret) { if (gasneti_VerboseErrors) { gasneti_console_message("WARNING", "Unexpected error %d (%s) from %s() when binding segment [%p, %p) to EP %d", ret, fi_strerror(-ret), reg_fn, segment->_addr, segment->_ub, i_ep->_index); } // TODO: can we do better sorting out failure modes? return GASNET_ERR_RESOURCE; } if (segment && ! GASNETC_OFI_HAS_MR_PROV_KEY) { gasneti_assert_uint(key ,==, fi_mr_key(*mrfd_p)); } #ifdef FI_MR_ENDPOINT if (gasnetc_fi_mr_endpoint) { ret = fi_mr_bind(*mrfd_p, &gasnetc_ofi_rdma_epfd->fid, 0); GASNETC_OFI_CHECK_RET(ret, "fi_mr_bind failed"); ret = fi_mr_enable(*mrfd_p); GASNETC_OFI_CHECK_RET(ret, "fi_mr_enable failed"); } #endif return GASNET_OK; } int gasnetc_ep_unbindsegment(gasneti_EP_t i_ep) { gasneti_assert(i_ep); GASNETI_TRACE_PRINTF(C,("Unbinding segment from EP %d", i_ep->_index)); #if GASNET_SEGMENT_FAST || GASNET_SEGMENT_LARGE gasnetc_EP_t c_ep = (gasnetc_EP_t)i_ep; if (c_ep->mrfd) { int ret = fi_close(&c_ep->mrfd->fid); GASNETC_OFI_CHECK_RET(ret, "fi_close(ep->mrfd) failed"); c_ep->mrfd = NULL; c_ep->_segment = NULL; } #endif return GASNET_OK; } // Exchange memory keys with other nodes, if needed void gasnetc_segment_exchange(gex_TM_t tm, gex_EP_t *eps, size_t num_eps) { if (!GASNETC_OFI_HAS_MR_PROV_KEY) return; // nothing to be done // Exchange a 64-bit mr key struct exchg_data { gex_EP_Location_t loc; uint64_t mr_key; } *local, *global, *p; size_t elem_sz = sizeof(struct exchg_data); local = gasneti_malloc(num_eps * elem_sz); // Pack p = local; for (gex_Rank_t i = 0; i < num_eps; ++i) { gasnetc_EP_t c_ep = (gasnetc_EP_t) gasneti_import_ep(eps[i]); if (! c_ep->_segment) continue; p->loc.gex_rank = gasneti_mynode; p->loc.gex_ep_index = c_ep->_index; gasneti_assert(c_ep->mrfd); p->mr_key = fi_mr_key(c_ep->mrfd); ++p; } size_t local_bytes = elem_sz * (p - local); size_t total_bytes = gasneti_blockingRotatedExchangeV(tm, local, local_bytes, (void**)&global, NULL); size_t total_eps = total_bytes / elem_sz; gasneti_free(local); // Unpack p = global; for (size_t i = 0; i < total_eps; ++i, ++p) { gex_EP_Index_t idx = p->loc.gex_ep_index; uint64_t *key_array = gasnetc_remote_key_tbl[idx]; if_pf (!key_array) { static gasneti_mutex_t lock = GASNETI_MUTEX_INITIALIZER; gasneti_mutex_lock(&lock); key_array = gasnetc_remote_key_tbl[idx]; if (!key_array) { key_array = gasneti_calloc(gasneti_nodes, sizeof(uint64_t)); gasneti_local_wmb(); gasnetc_remote_key_tbl[idx] = key_array; } gasneti_mutex_unlock(&lock); } else { gasneti_local_rmb(); } gex_Rank_t jobrank = p->loc.gex_rank; uint64_t key = p->mr_key; gasneti_assert(key_array[jobrank] == 0 || key_array[jobrank] == p->mr_key); key_array[jobrank] = p->mr_key; } gasneti_free(global); } void gasnetc_auxseg_register(gasnet_seginfo_t si) { int ret = fi_mr_reg(gasnetc_ofi_domainfd, si.addr, si.size, FI_REMOTE_READ | FI_REMOTE_WRITE, 0ULL, GASNETC_AUX_KEY, 0ULL, &gasnetc_auxseg_mrfd, NULL); GASNETC_OFI_CHECK_RET(ret, "fi_mr_reg for aux_seg failed"); #ifdef FI_MR_ENDPOINT if (gasnetc_fi_mr_endpoint) { ret = fi_mr_bind(gasnetc_auxseg_mrfd, &gasnetc_ofi_rdma_epfd->fid, 0); GASNETC_OFI_CHECK_RET(ret, "fi_mr_bind failed for aux_seg"); ret = fi_mr_enable(gasnetc_auxseg_mrfd); GASNETC_OFI_CHECK_RET(ret, "fi_mr_enable failed for aux_seg"); } #endif if (GASNETC_OFI_HAS_MR_PROV_KEY) { // Provider was not required to honor our key, so we exchange them uint64_t mr_key = fi_mr_key(gasnetc_auxseg_mrfd); uint64_t *aux_keys = gasneti_malloc(gasneti_nodes * sizeof(uint64_t)); gasneti_bootstrapExchange(&mr_key, sizeof(mr_key), aux_keys); gasneti_assert(! gasnetc_remote_key_tbl[-1]); gasnetc_remote_key_tbl[-1] = aux_keys; } else { gasneti_assert_uint(GASNETC_AUX_KEY ,==, fi_mr_key(gasnetc_auxseg_mrfd)); } } /*------------------------------------------------ * OFI conduit network poll function * ----------------------------------------------*/ // TX progress function: Handles either AM or RDMA outgoing operations // Returns the number of completions processed GASNETI_INLINE(gasnetc_ofi_tx_poll_one) int gasnetc_ofi_tx_poll_one(struct fid_cq* cqfd) { struct fi_cq_data_entry re[GASNETC_OFI_NUM_COMPLETIONS]; struct fi_cq_err_entry e; /* Read from Completion Queue */ /* In the case of using one global lock, a try-lock could prevent progress from * occurring if the big lock is being held often by another thread. Just lock in * this case */ #if GASNETC_OFI_USE_THREAD_DOMAIN GASNETC_OFI_LOCK(&gasnetc_ofi_locks.tx_cq); #else /* If another thread already has the queue lock, return as it is already * processing the queue */ if(EBUSY == GASNETC_OFI_TRYLOCK(&gasnetc_ofi_locks.tx_cq)) return 0; #endif int ret = fi_cq_read(cqfd, (void *)&re, GASNETC_OFI_NUM_COMPLETIONS); GASNETC_OFI_UNLOCK(&gasnetc_ofi_locks.tx_cq); if (ret != -FI_EAGAIN) { if_pf (ret < 0) { if (-FI_EAVAIL == ret) { GASNETC_OFI_LOCK_EXPR(&gasnetc_ofi_locks.tx_cq, gasnetc_fi_cq_readerr(cqfd, &e ,0)); if_pf (gasnetc_is_exit_error(e)) return 0; gasnetc_ofi_fatalerror("fi_cq_read for tx_poll failed with error", -e.err); } else gasnetc_ofi_fatalerror("fi_cq_read for tx_poll returned unexpected error", ret); } else { for (int i = 0; i < ret; i++) { if (re[i].flags & FI_SEND) { #if GASNET_DEBUG gasnetc_paratomic_decrement(&pending_am, 0); #endif gasnetc_ofi_send_ctxt_t *header = gasnetc_op_ctxt_to_send_ctxt(re[i].op_context); gasnetc_ofi_am_send_complete(header); } else if(re[i].flags & FI_WRITE || re[i].flags & FI_READ) { #if GASNET_DEBUG gasnetc_paratomic_decrement(&pending_rdma, 0); #endif gasnetc_op_ctxt_run_rdma_callback(re[i].op_context); } else { gasneti_fatalerror("Unknown completion type received for gasnetc_ofi_tx_poll\n"); } } } return ret; } gasneti_assert(ret == -FI_EAGAIN); return 0; } void gasnetc_ofi_tx_poll(void) { int rc = gasnetc_ofi_tx_poll_one(gasnetc_ofi_tx_cqfd); GASNETI_TRACE_EVENT_VAL(X, CQ_READ_TX, rc); #if GASNETC_OFI_USE_MULTI_CQ // TODO: use poll sets for providers/platforms which support them if (gasnetc_ofi_reqtx_cqfd) { rc = gasnetc_ofi_tx_poll_one(gasnetc_ofi_reqtx_cqfd); GASNETI_TRACE_EVENT_VAL(X, CQ_READ_REQTX, rc); } if (gasnetc_ofi_reptx_cqfd) { rc = gasnetc_ofi_tx_poll_one(gasnetc_ofi_reptx_cqfd); GASNETI_TRACE_EVENT_VAL(X, CQ_READ_REPTX, rc); } #endif } GASNETI_INLINE(gasnetc_ofi_am_recv_poll) void gasnetc_ofi_am_recv_poll(int is_request) { #if GASNETC_OFI_RETRY_RECVMSG static gasnetc_ofi_recv_ctxt_t *buffs_to_retry[2] = { NULL, NULL }; #endif struct fid_ep * ep; struct fid_cq * cq; gasneti_atomic_t * lock_p; if (is_request) { ep = gasnetc_ofi_request_epfd; cq = gasnetc_ofi_request_cqfd; #if GASNET_PAR && !GASNETC_OFI_USE_THREAD_DOMAIN lock_p = &gasnetc_ofi_locks.rx_request_cq; #endif } else { ep = gasnetc_ofi_reply_epfd; cq = gasnetc_ofi_reply_cqfd; #if GASNET_PAR && !GASNETC_OFI_USE_THREAD_DOMAIN lock_p = &gasnetc_ofi_locks.rx_reply_cq; #endif } int count; for (count = 0; count < GASNETC_OFI_EVENTS_PER_POLL; ++count) { if(EBUSY == GASNETC_OFI_PAR_TRYLOCK(lock_p)) goto out; /* Read from Completion Queue */ struct fi_cq_data_entry re = {0}; int ret = fi_cq_read(cq, (void *)&re, 1); if (ret == -FI_EAGAIN) { GASNETC_OFI_PAR_UNLOCK(lock_p); goto out; } if_pf (ret < 0) { struct fi_cq_err_entry e = {0}; gasnetc_fi_cq_readerr(cq, &e ,0); GASNETC_OFI_PAR_UNLOCK(lock_p); if_pf (gasnetc_is_exit_error(e)) goto out; gasnetc_ofi_fatalerror("fi_cq_read for am_recv_poll failed with error", -e.err); } gasnetc_ofi_recv_ctxt_t *header = gasnetc_op_ctxt_to_recv_ctxt(re.op_context); /* Count number of completions read for this posted buffer */ header->event_cntr++; #if GASNET_TRACE { gasnetc_ofi_recv_metadata_t* metadata = gasneti_container_of(header, gasnetc_ofi_recv_metadata_t, am_buff_ctxt); int buffer_num = metadata - metadata_array; uint64_t event_num = header->event_cntr - 1; uintptr_t offset = (uintptr_t)re.buf - (uintptr_t)metadata->iov.iov_base; GASNETI_TRACE_PRINTF(D,("fi_cq_read(%s) %d:%"PRIu64"%s%s", is_request?"req":"rep", buffer_num, event_num, (re.flags & FI_RECV)?gasneti_dynsprintf(" RECV@%"PRIuPTR"+%"PRIuSZ, offset, re.len) :"", (re.flags & FI_MULTI_RECV)?" UNLINK":"")); } #endif /* Record the total number of completions read */ if_pf (re.flags & FI_MULTI_RECV) { header->final_cntr = header->event_cntr; } GASNETC_OFI_PAR_UNLOCK(lock_p); if_pt (re.flags & FI_RECV) { // re.data contains the payload length for a Long gasnetc_ofi_handle_am(re.buf, is_request, re.len, re.data); } /* The atomic here ensures that the buffer is not reposted while an AM handler is * still running. */ uint64_t tmp = gasnetc_paratomic_add(&header->consumed_cntr, 1, GASNETI_ATOMIC_ACQ); if_pf (tmp == (GASNETI_ATOMIC_MAX & header->final_cntr)) { gasnetc_ofi_recv_metadata_t* metadata = gasneti_container_of(header, gasnetc_ofi_recv_metadata_t, am_buff_ctxt); struct fi_msg* am_buff_msg = &metadata->am_buff_msg; GASNETC_OFI_LOCK(&gasnetc_ofi_locks.am_rx); int post_ret = fi_recvmsg(ep, am_buff_msg, FI_MULTI_RECV); GASNETC_OFI_UNLOCK(&gasnetc_ofi_locks.am_rx); #if GASNETC_OFI_RETRY_RECVMSG if_pf (post_ret == -FI_EAGAIN) { GASNETC_OFI_PAR_LOCK(lock_p); header->next = buffs_to_retry[is_request]; buffs_to_retry[is_request] = header; post_ret = FI_SUCCESS; if (is_request) { GASNETI_TRACE_EVENT(C, RECVMSG_REQ_EAGAIN); } else { GASNETI_TRACE_EVENT(C, RECVMSG_REP_EAGAIN); } GASNETC_OFI_PAR_UNLOCK(lock_p); } #endif GASNETC_OFI_CHECK_RET(post_ret, "fi_recvmsg failed inside am_recv_poll"); if (is_request) { GASNETI_TRACE_EVENT(C, RECVMSG_REQ); } else { GASNETI_TRACE_EVENT(C, RECVMSG_REP); } } } #if GASNETC_OFI_RETRY_RECVMSG if_pf (buffs_to_retry[is_request]) { GASNETC_OFI_PAR_LOCK(lock_p); gasnetc_ofi_recv_ctxt_t **prev_p = &buffs_to_retry[is_request]; gasnetc_ofi_recv_ctxt_t *curr = *prev_p; while (curr) { gasnetc_ofi_recv_ctxt_t *next = curr->next; gasnetc_ofi_recv_metadata_t* metadata = gasneti_container_of(curr, gasnetc_ofi_recv_metadata_t, am_buff_ctxt); struct fi_msg* am_buff_msg = &metadata->am_buff_msg; #if GASNET_PAR && GASNETC_OFI_USE_THREAD_DOMAIN // avoid recursive acquire of big_lock int post_ret = fi_recvmsg(ep, am_buff_msg, FI_MULTI_RECV); #else GASNETC_OFI_LOCK(&gasnetc_ofi_locks.am_rx); int post_ret = fi_recvmsg(ep, am_buff_msg, FI_MULTI_RECV); GASNETC_OFI_UNLOCK(&gasnetc_ofi_locks.am_rx); #endif if (post_ret == -FI_EAGAIN) { prev_p = &curr->next; // retain curr in the list } else { GASNETC_OFI_CHECK_RET(post_ret, "deferred fi_recvmsg failed"); if (is_request) { GASNETI_TRACE_EVENT(C, RECVMSG_REQ_REPOST); } else { GASNETI_TRACE_EVENT(C, RECVMSG_REP_REPOST); } *prev_p = next; // remove curr from the list } curr = next; } GASNETC_OFI_PAR_UNLOCK(lock_p); } #endif out: if (is_request) { GASNETI_TRACE_EVENT_VAL(X, CQ_READ_REQ, count); } else { GASNETI_TRACE_EVENT_VAL(X, CQ_READ_REP, count); } } /* General progress function */ void gasnetc_ofi_poll() { gasnetc_ofi_tx_poll(); gasnetc_ofi_am_recv_poll(1); /* requests */ gasnetc_ofi_am_recv_poll(0); /* replies */ } /*------------------------------------------------ * OFI conduit am send functions * ----------------------------------------------*/ int gasnetc_ofi_am_send_short(gex_Rank_t dest, gex_AM_Index_t handler, int numargs, va_list argptr, int isreq, gex_Flags_t flags GASNETI_THREAD_FARG) { // Get a send buffer gasnetc_ofi_send_ctxt_t *header = gasnetc_ofi_get_am_header(isreq, flags GASNETI_THREAD_PASS); if (!header) { gasneti_assert(flags & GEX_FLAG_IMMEDIATE); return 1; } gasnetc_ofi_am_send_buf_t *sendbuf = &header->sendbuf; int ret = 0; struct fid_ep* ep; fi_addr_t am_dest; int poll_type; if (isreq) { ep = gasnetc_ofi_request_epfd; am_dest = gasnetc_fabric_addr(REQ, dest); poll_type = OFI_POLL_ALL; } else { ep = gasnetc_ofi_reply_epfd; am_dest = gasnetc_fabric_addr(REP, dest); poll_type = OFI_POLL_REPLY; } size_t len = sizeof(gex_AM_Arg_t) * numargs + offsetof(gasnetc_ofi_am_send_buf_t, buf.short_buf); len = GASNETI_ALIGNUP(len, GASNETI_MEDBUF_ALIGNMENT); // ensure multi-recv buffer alignment // Initialize metadata (handler, args, etc.) sendbuf->handler = (uint8_t) handler; sendbuf->sourceid = gasneti_mynode; sendbuf->type = OFI_AM_SHORT; sendbuf->argnum = numargs; sendbuf->isreq = isreq; gex_AM_Arg_t *arglist = (gex_AM_Arg_t*) sendbuf->buf.short_buf.data; for (int i = 0 ; i < numargs ; ++i) { arglist[i] = va_arg(argptr, gex_AM_Arg_t); } // Send if(len <= max_buffered_send) { OFI_INJECT_RETRY_IMM(&gasnetc_ofi_locks.am_tx, ret = fi_inject(ep, sendbuf, len, am_dest), poll_type, flags & GEX_FLAG_IMMEDIATE, out_imm); GASNETC_OFI_CHECK_RET(ret, "fi_inject for short am failed"); gasnetc_ofi_free_am_header(header); } else { struct fi_context *op_cxtx = gasnetc_send_ctxt_to_op_ctxt(header); OFI_INJECT_RETRY_IMM(&gasnetc_ofi_locks.am_tx, ret = fi_send(ep, sendbuf, len, NULL, am_dest, op_cxtx), poll_type, flags & GEX_FLAG_IMMEDIATE, out_imm); GASNETC_OFI_CHECK_RET(ret, "fi_send for short am failed"); #if GASNET_DEBUG gasnetc_paratomic_increment(&pending_am,0); #endif } return 0; out_imm: gasnetc_ofi_free_am_header(header); return 1; } int gasnetc_ofi_am_send_medium(gex_Rank_t dest, gex_AM_Index_t handler, void *source_addr, size_t nbytes, /* data payload */ int numargs, va_list argptr, int isreq, gex_Flags_t flags GASNETI_THREAD_FARG) { // Get a send buffer gasnetc_ofi_send_ctxt_t *header = gasnetc_ofi_get_am_header(isreq, flags GASNETI_THREAD_PASS); if (!header) { gasneti_assert(flags & GEX_FLAG_IMMEDIATE); return 1; } gasnetc_ofi_am_send_buf_t *sendbuf = &header->sendbuf; int ret = 0; struct fid_ep* ep; fi_addr_t am_dest; int poll_type; if (isreq) { ep = gasnetc_ofi_request_epfd; am_dest = gasnetc_fabric_addr(REQ, dest); poll_type = OFI_POLL_ALL; } else { ep = gasnetc_ofi_reply_epfd; am_dest = gasnetc_fabric_addr(REP, dest); poll_type = OFI_POLL_REPLY; } size_t len = GASNETI_ALIGNUP(sizeof(gex_AM_Arg_t)*numargs, GASNETI_MEDBUF_ALIGNMENT); memcpy((uint8_t *)(sendbuf->buf.medium_buf.data)+ len, source_addr, nbytes); len += (nbytes + offsetof(gasnetc_ofi_am_send_buf_t, buf.medium_buf)); len = GASNETI_ALIGNUP(len, GASNETI_MEDBUF_ALIGNMENT); // ensure multi-recv buffer alignment // Initialize metadata (handler, args, etc.) sendbuf->handler = (uint8_t) handler; sendbuf->sourceid = gasneti_mynode; sendbuf->type = OFI_AM_MEDIUM; sendbuf->argnum = numargs; sendbuf->isreq = isreq; gex_AM_Arg_t *arglist = (gex_AM_Arg_t*) sendbuf->buf.medium_buf.data; for (int i = 0 ; i < numargs ; ++i) { arglist[i] = va_arg(argptr, gex_AM_Arg_t); } // Enable reconstruction of nbytes from message length size_t overhead = len - nbytes; sendbuf->overhead = overhead; gasneti_assert_uint(overhead ,<, 256); // Send if(len <= max_buffered_send) { OFI_INJECT_RETRY_IMM(&gasnetc_ofi_locks.am_tx, ret = fi_inject(ep, sendbuf, len, am_dest), poll_type, flags & GEX_FLAG_IMMEDIATE, out_imm); GASNETC_OFI_CHECK_RET(ret, "fi_inject for medium am failed"); gasnetc_ofi_free_am_header(header); } else { struct fi_context *op_cxtx = gasnetc_send_ctxt_to_op_ctxt(header); OFI_INJECT_RETRY_IMM(&gasnetc_ofi_locks.am_tx, ret = fi_send(ep, sendbuf, len, NULL, am_dest, op_cxtx), poll_type, flags & GEX_FLAG_IMMEDIATE, out_imm); GASNETC_OFI_CHECK_RET(ret, "fi_send for medium am failed"); #if GASNET_DEBUG gasnetc_paratomic_increment(&pending_am,0); #endif } return 0; out_imm: gasnetc_ofi_free_am_header(header); return 1; } int gasnetc_ofi_am_send_long(gex_Rank_t dest, gex_AM_Index_t handler, void *source_addr, size_t nbytes, /* data payload */ void *dest_addr, int numargs, va_list argptr, int isreq, gex_Flags_t flags GASNETI_THREAD_FARG) { // Get a send buffer gasnetc_ofi_send_ctxt_t *header = gasnetc_ofi_get_am_header(isreq, flags GASNETI_THREAD_PASS); if (!header) { gasneti_assert(flags & GEX_FLAG_IMMEDIATE); return 1; } gasnetc_ofi_am_send_buf_t *sendbuf = &header->sendbuf; int ret = 0; struct fid_ep* ep; fi_addr_t am_dest; int poll_type; if (isreq) { ep = gasnetc_ofi_request_epfd; am_dest = gasnetc_fabric_addr(REQ, dest); poll_type = OFI_POLL_ALL; } else { ep = gasnetc_ofi_reply_epfd; am_dest = gasnetc_fabric_addr(REP, dest); poll_type = OFI_POLL_REPLY; } // Copy or Put the payload size_t len = sizeof(gex_AM_Arg_t)*numargs; if(len + nbytes < long_rma_threshold) { // Pack the payload if it's small enough memcpy(sendbuf->buf.long_buf.data + len, source_addr, nbytes); len += nbytes; sendbuf->type = OFI_AM_LONG_MEDIUM; } else if (flags & GEX_FLAG_IMMEDIATE) { // As long as we are stalling for remote completion of a Put, must "fail" IMMEDIATE goto out_imm; } else { // Launch the long data payload transfer with RMA operation gasnetc_ofi_blocking_op_ctxt_t lam_ctxt; lam_ctxt.complete = 0; lam_ctxt.callback = gasnetc_ofi_handle_blocking; // TODO: following line is only correct until AM supported on non-primordial EP const int rem_epidx = gasnetc_in_auxseg(dest, dest_addr) ? -1 : 0; gasnetc_EP_t c_ep = (gasnetc_EP_t)gasneti_import_ep(gasneti_THUNK_EP); OFI_INJECT_RETRY(&gasnetc_ofi_locks.rdma_tx, OFI_WRITE(c_ep, source_addr, nbytes, dest, rem_epidx, dest_addr, &lam_ctxt, 0), poll_type); GASNETC_OFI_CHECK_RET(ret, "fi_write failed for AM long"); #if GASNET_DEBUG gasnetc_paratomic_increment(&pending_rdma,0); #endif /* Because the order is not guaranteed between different ep, */ /* we send the am part after confirming the large rdma operation */ /* is successful. */ GASNETI_SPIN_WHILE(!lam_ctxt.complete, GASNETC_OFI_POLL_SELECTIVE(poll_type)); sendbuf->type = OFI_AM_LONG; } len += offsetof(gasnetc_ofi_am_send_buf_t, buf.long_buf.data); len = GASNETI_ALIGNUP(len, GASNETI_MEDBUF_ALIGNMENT); // ensure multi-recv buffer alignment // Initialize metadata (handler, args, etc.) sendbuf->handler = (uint8_t) handler; sendbuf->sourceid = gasneti_mynode; sendbuf->argnum = numargs; sendbuf->isreq = isreq; sendbuf->buf.long_buf.dest_ptr = dest_addr; gex_AM_Arg_t *arglist = (gex_AM_Arg_t*) sendbuf->buf.long_buf.data; for (int i = 0 ; i < numargs ; ++i) { arglist[i] = va_arg(argptr, gex_AM_Arg_t); } if(len <= max_buffered_send) { OFI_INJECT_RETRY_IMM(&gasnetc_ofi_locks.am_tx, ret = fi_injectdata(ep, sendbuf, len, nbytes, am_dest), poll_type, flags & GEX_FLAG_IMMEDIATE, out_imm); GASNETC_OFI_CHECK_RET(ret, "fi_inject for long am failed"); gasnetc_ofi_free_am_header(header); } else { struct fi_context *op_cxtx = gasnetc_send_ctxt_to_op_ctxt(header); OFI_INJECT_RETRY_IMM(&gasnetc_ofi_locks.am_tx, ret = fi_senddata(ep, sendbuf, len, NULL, nbytes, am_dest, op_cxtx), poll_type, flags & GEX_FLAG_IMMEDIATE, out_imm); GASNETC_OFI_CHECK_RET(ret, "fi_send for long am failed"); #if GASNET_DEBUG gasnetc_paratomic_increment(&pending_am,0); #endif } return 0; out_imm: gasnetc_ofi_free_am_header(header); return 1; } GASNETI_INLINE(get_bounce_bufs) int get_bounce_bufs(int n, gasnetc_ofi_bounce_buf_t ** arr) { int i,j; gasnetc_ofi_bounce_buf_t* buf_container; for (i = 0; i < n; i++) { buf_container = gasneti_lifo_pop(&ofi_bbuf_pool); if (!buf_container) { for (j = i -1; j >= 0; j--) { gasneti_lifo_push(&ofi_bbuf_pool, arr[j]); } return 0; } arr[i] = buf_container; } return 1; } /*------------------------------------------------ * OFI conduit one-sided put/get functions * ----------------------------------------------*/ /* There is not a good semantic match between GASNet and OFI in the non-blocking, * non-bulk puts due to local completion requirements. This function handles this * special case. * * Returns a valid event if necessary to block for remote completion. * Otherwise returns GEX_EVENT_INVALID */ gex_Event_t gasnetc_rdma_put_non_bulk(gex_TM_t tm, gex_Rank_t rank, void* dest_addr, void* src_addr, size_t nbytes, gasnetc_ofi_nb_op_ctxt_t* ctxt_ptr, gex_Flags_t flags GASNETI_THREAD_FARG) { const gex_EP_Location_t loc = gasneti_e_tm_rank_to_location(tm, rank, 0); const gex_Rank_t jobrank = loc.gex_rank; const int rem_epidx = gasnetc_in_auxseg(jobrank, dest_addr) ? -1 : loc.gex_ep_index; gasnetc_EP_t c_ep = (gasnetc_EP_t)gasneti_e_tm_to_i_ep(tm); int i; int ret = FI_SUCCESS; uintptr_t src_ptr = (uintptr_t)src_addr; gasnetc_assert_callback_eq(ctxt_ptr, gasnetc_ofi_handle_rdma); PERIODIC_RMA_POLL(); #if GASNET_HAVE_MK_CLASS_CUDA_UVA // CUDA device memory precludes bounce buffers and (at least currently) use of FI_INJECT if (c_ep->device_only_segment) goto block_anyways; #endif /* The payload can be injected without need for a bounce buffer */ if (nbytes <= max_buffered_write) { struct fi_msg_rma msg; msg.desc = 0; msg.addr = gasnetc_fabric_addr(RDMA, jobrank); struct iovec iovec; struct fi_rma_iov rma_iov; iovec.iov_base = src_addr; iovec.iov_len = nbytes; rma_iov.addr = gasnetc_remote_addr(jobrank, dest_addr, rem_epidx); rma_iov.key = gasnetc_remote_key(jobrank, rem_epidx); rma_iov.len = nbytes; msg.context = gasnetc_rdma_ctxt_to_op_ctxt(ctxt_ptr,0); msg.msg_iov = &iovec; msg.iov_count = 1; msg.rma_iov = &rma_iov; msg.rma_iov_count = 1; OFI_INJECT_RETRY_IMM(&gasnetc_ofi_locks.rdma_tx, ret = fi_writemsg(gasnetc_ofi_rdma_epfd, &msg, FI_INJECT | FI_DELIVERY_COMPLETE), OFI_POLL_ALL, flags & GEX_FLAG_IMMEDIATE, out_imm_inject); GASNETC_OFI_CHECK_RET(ret, "fi_writemsg with FI_INJECT failed"); #if GASNET_DEBUG gasnetc_paratomic_increment(&pending_rdma,0); #endif GASNETC_STAT_EVENT(NB_PUT_INJECT); return GEX_EVENT_INVALID; out_imm_inject: return GEX_EVENT_NO_OP; } /* Bounce buffers are needed */ else if (nbytes <= gasnetc_ofi_bbuf_threshold) { uintptr_t dest_ptr = (uintptr_t)dest_addr; int num_bufs_needed = (nbytes + ofi_bbuf_size - 1) / ofi_bbuf_size; size_t bytes_to_copy; gasnetc_ofi_bounce_buf_t* buffs[OFI_MAX_NUM_BOUNCE_BUFFERS]; /* If there are not enough bounce buffers available, simply block as * we don't know when more will become available */ ret = get_bounce_bufs(num_bufs_needed, buffs); if (!ret) goto block_anyways; gasnetc_ofi_bounce_op_ctxt_t * bbuf_ctxt = gasnetc_ofi_get_bounce_ctxt(); gasnetc_assert_callback_eq(bbuf_ctxt, gasnetc_ofi_handle_bounce_rdma); bbuf_ctxt->orig_op = ctxt_ptr; gasnetc_paratomic_set(&bbuf_ctxt->cntr, num_bufs_needed, 0); i = 0; gasnetc_ofi_bounce_buf_t* buf_container; gex_Flags_t imm = flags & GEX_FLAG_IMMEDIATE; while (num_bufs_needed > 0) { bytes_to_copy = num_bufs_needed != 1 ? ofi_bbuf_size : nbytes; gasneti_assert(bytes_to_copy <= ofi_bbuf_size); buf_container = buffs[i]; gasneti_lifo_push(&bbuf_ctxt->bbuf_list, buf_container); memcpy(buf_container->buf, (void*)src_ptr, bytes_to_copy); OFI_INJECT_RETRY_IMM(&gasnetc_ofi_locks.rdma_tx, OFI_WRITE(c_ep, buf_container->buf, bytes_to_copy, jobrank, rem_epidx, (void *)dest_ptr, bbuf_ctxt, 0), OFI_POLL_ALL, imm, out_imm_bounce); imm = 0; // no going back once first buffer has been written GASNETC_OFI_CHECK_RET(ret, "fi_write for bounce buffered data failed"); #if GASNET_DEBUG gasnetc_paratomic_increment(&pending_rdma,0); #endif /* Update our pointers to locations in memory */ nbytes -= ofi_bbuf_size; dest_ptr += ofi_bbuf_size; src_ptr += ofi_bbuf_size; num_bufs_needed--; i++; } GASNETC_STAT_EVENT(NB_PUT_BOUNCE); return GEX_EVENT_INVALID; out_imm_bounce: gasneti_assume(i == 0); for (/*empty*/; i < num_bufs_needed; ++i) { gasneti_lifo_push(&ofi_bbuf_pool, buffs[i]); } gasneti_lifo_init(&bbuf_ctxt->bbuf_list); gasneti_lifo_push(&ofi_bbuf_ctxt_pool, bbuf_ctxt); return GEX_EVENT_NO_OP; } /* We tried our best to optimize this. Just wait for remote completion */ else { block_anyways: GASNETC_STAT_EVENT(NB_PUT_BLOCK); gasnete_eop_t *eop = gasnete_eop_new(GASNETI_MYTHREAD); eop->ofi.type = OFI_TYPE_EPUT; if (gasnetc_rdma_put(tm, rank, dest_addr, src_addr, nbytes, &eop->ofi, 0, flags GASNETI_THREAD_PASS)) { gasneti_assert(flags & GEX_FLAG_IMMEDIATE); GASNETE_EOP_MARKDONE(eop); gasnete_eop_free(eop GASNETI_THREAD_PASS); return GEX_EVENT_NO_OP; } return (gex_Event_t)eop; } } int gasnetc_rdma_put(gex_TM_t tm, gex_Rank_t rank, void *dest_addr, void *src_addr, size_t nbytes, gasnetc_ofi_nb_op_ctxt_t *ctxt_ptr, int alc, gex_Flags_t flags GASNETI_THREAD_FARG) { const gex_EP_Location_t loc = gasneti_e_tm_rank_to_location(tm, rank, 0); const gex_Rank_t jobrank = loc.gex_rank; const int rem_epidx = gasnetc_in_auxseg(jobrank, dest_addr) ? -1 : loc.gex_ep_index; gasnetc_EP_t c_ep = (gasnetc_EP_t)gasneti_e_tm_to_i_ep(tm); int ret = FI_SUCCESS; gasnetc_assert_callback_eq(ctxt_ptr, gasnetc_ofi_handle_rdma); gasneti_assert((alc == 0) || (alc == 1)); PERIODIC_RMA_POLL(); OFI_INJECT_RETRY_IMM(&gasnetc_ofi_locks.rdma_tx, OFI_WRITE(c_ep, src_addr, nbytes, jobrank, rem_epidx, dest_addr, ctxt_ptr, alc), OFI_POLL_ALL, flags & GEX_FLAG_IMMEDIATE, out_imm); GASNETC_OFI_CHECK_RET(ret, "fi_write failed"); #if GASNET_DEBUG gasnetc_paratomic_increment(&pending_rdma,0); #endif return 0; out_imm: return 1; } int gasnetc_rdma_get(void *dest_addr, gex_TM_t tm, gex_Rank_t rank, void * src_addr, size_t nbytes, gasnetc_ofi_nb_op_ctxt_t *ctxt_ptr, gex_Flags_t flags GASNETI_THREAD_FARG) { const gex_EP_Location_t loc = gasneti_e_tm_rank_to_location(tm, rank, 0); const gex_Rank_t jobrank = loc.gex_rank; const int rem_epidx = gasnetc_in_auxseg(jobrank, src_addr) ? -1 : loc.gex_ep_index; gasnetc_EP_t c_ep = (gasnetc_EP_t)gasneti_e_tm_to_i_ep(tm); int ret = FI_SUCCESS; gasnetc_assert_callback_eq(ctxt_ptr, gasnetc_ofi_handle_rdma); PERIODIC_RMA_POLL(); OFI_INJECT_RETRY_IMM(&gasnetc_ofi_locks.rdma_tx, OFI_READ(c_ep, dest_addr, nbytes, jobrank, rem_epidx, src_addr, ctxt_ptr, 0), OFI_POLL_ALL, flags & GEX_FLAG_IMMEDIATE, out_imm); GASNETC_OFI_CHECK_RET(ret, "fi_read failed"); #if GASNET_DEBUG gasnetc_paratomic_increment(&pending_rdma,0); #endif return 0; out_imm: return 1; } #if GASNET_HAVE_MK_CLASS_MULTIPLE int gasnetc_mk_create_hook( gasneti_MK_t kind, gasneti_Client_t client, const gex_MK_Create_args_t *args, gex_Flags_t flags) { // Fail if the FI_HMEM capability was not found at initialization // TODO: fall back to reference implementation when we have one if (!gasnetc_fi_hmem) { GASNETI_RETURN_ERRR(RESOURCE, gasneti_dynsprintf("Provider '%s' reports no support for FI_HMEM needed for memory kinds", gasnetc_ofi_provider)); } // TODO: Fail (later fall back to ref) if the given device support is not present. // // The libfabric maintainers say that attempting a memory registration with // a given iface value is the only reliable way to determine if the device // support is present (see https://github.com/ofiwg/libfabric/issues/7973 ). // However, libfabric releases through (at least) 1.15.2 incorrectly return // success from fi_mr_regattr() when called with an iface value which is not // supported (see https://github.com/ofiwg/libfabric/issues/7977 ). // Once that is resolved, we can/should attempt a small registration // here and look for `-FI_ENOSYS` as an indication that the requested // device support is missing. // Capture the user's device argument for use in memory registration switch (args->gex_class) { #if GASNET_HAVE_MK_CLASS_CUDA_UVA case GEX_MK_CLASS_CUDA_UVA: kind->_mk_conduit = (void*)(uintptr_t)args->gex_args.gex_class_cuda_uva.gex_CUdevice; break; #endif #if GASNET_HAVE_MK_CLASS_HIP case GEX_MK_CLASS_HIP: // No device needed for HIP break; #endif default: gasneti_unreachable_error(("unknown or unsupported gex_MK_Class_t value: %d", args->gex_class)); break; } return GASNET_OK; } #endif