/* * UPC Runtime initialization code * * $Source: bitbucket.org:berkeleylab/upc-runtime.git/upcr_init.c $ */ #include #include #include #if PLATFORM_OS_IRIX #define signal(a,b) bsd_signal(a,b) #endif typedef void (*upcri_sighandlerfn_t)(int); #define MEGABYTE (1024*1024LU) // TODO-EX: initializers? gex_Client_t upcri_client; gex_EP_t upcri_ep; gex_TM_t upcri_tm; gex_Segment_t upcri_segment; // Vector of gex_RankInfo_t enumerating local Host membership static gex_Rank_t upcri_my_host_size; static gex_RankInfo_t *upcri_my_host; #if UPCRI_USING_PSHM // Vector of gex_RankInfo_t enumerating local Nbrhd membership static gex_Rank_t upcri_my_nbrhd_size; static gex_RankInfo_t *upcri_my_nbrhd; #endif /* for debugging support */ volatile int bupc_frozen = 1; static const char * upcri_main_name = NULL; GASNETT_THREADKEY_DEFINE(upcri_AMhandlercontext_key); struct upcri_spawn_arg { int argc; char **argv; int mythread; #if UPCRI_UPC_PTHREADS int mypthread; #endif uintptr_t static_data_size; uintptr_t default_cache_size; struct upcr_startup_spawnfuncs *spawnfuncs; }; /* TODO: move to caching implementation file when there is one: * right now these are placeholders */ uintptr_t upcri_cache_size = 0; void upcri_init_cache(void *start, uintptr_t len) { return; } extern void upcri_init_extern(void); #if 0 && UPCR_EXTEND_PRINTF /* * Functions to extend printf, etc., to recognize %S and %P */ #include static int print_shared_arginfo(const struct printf_info *info, size_t n, int *argtypes) { if (n > 0) argtypes[0] = PA_POINTER; return 1; } static int print_shared(FILE *stream, const struct printf_info *info, const void *const *args) { upcr_shared_ptr_t *psptr; int len; unsigned int node, phase; UPCR_BEGIN_FUNCTION(); psptr = *((upcr_shared_ptr_t**)(args[0])); return upcri_dump_shared(*psptr, stream); } static int print_pshared_arginfo(const struct printf_info *info, size_t n, int *argtypes) { if (n > 0) argtypes[0] = PA_POINTER; return 1; } static int print_pshared(FILE *stream, const struct printf_info *info, const void *const *args) { upcr_pshared_ptr_t *psptr; int len; unsigned int node, phase; UPCR_BEGIN_FUNCTION(); psptr = *((upcr_pshared_ptr_t**)(args[0])); return upcri_dump_pshared(*psptr, stream); } #endif /* UPCR_EXTEND_PRINTF */ #ifdef UPCRI_UPC_PTHREADS /* generated by linker */ extern upcri_pthreadinfo_t* upcri_linkergenerated_tld_init(void); #endif #if GASNET_TRACE int upcri_trace_suppresslocal = 0; #endif #ifndef UPCRI_UNFREEZE_SIGNAL /* signal to use for unfreezing, could also use SIGUSR1/2 or several others */ #define UPCRI_UNFREEZE_SIGNAL SIGCONT #define UPCRI_UNFREEZE_SIGNAL_STR "SIGCONT" #endif static void upcri_unfreezeHandler(int sig) { bupc_frozen = 0; } /* allows attaching debugger as early as possible after gex_Init_Client */ static void upcri_startup_freeze_early(upcr_thread_t freezenode) { pid_t pid; upcri_sighandlerfn_t oldhandler; pid = getpid(); if (freezenode >= upcri_nodes) freezenode = 0; oldhandler = (upcri_sighandlerfn_t)signal(UPCRI_UNFREEZE_SIGNAL, upcri_unfreezeHandler); if (upcri_mynode == freezenode) { bupc_frozen = 1; printf( "************************************************************************\n" "****** Early Freeze of UPC application for debugging ******\n" "************************************************************************\n" "Node %d (pid %d on %s) is frozen: attach a debugger to it and set\n" " the 'bupc_frozen' variable to 0 to continue, or send a " UPCRI_UNFREEZE_SIGNAL_STR ".\n" " - To debug additional nodes, attach to them before unfreezing node %d.\n", (int)upcri_mynode, (int)pid, gasnett_gethostname(), (int)upcri_mynode); if (upcri_main_name != NULL) { printf( " - Note: if you wish to set a breakpoint at 'main', use '%s' instead.\n", upcri_main_name); } fflush(stdout); } UPCRI_SINGLE_BARRIER_WITHCODE_NOTHR(sleep(1)); if (upcri_mynode == freezenode) { while (bupc_frozen == 1) gasnett_sched_yield(); /* yield CPU to speed debugger operation */ } else { printf("Node %d (pid %d on %s) waiting for node %d\n", (int)upcri_mynode, (int)pid, gasnett_gethostname(), freezenode); fflush(stdout); sleep(1); } UPCRI_SINGLE_BARRIER_NOTHR(); signal(UPCRI_UNFREEZE_SIGNAL, oldhandler); } /* To support debugging of application code: pthreads launched, * and data structures all initialized by time this is called. */ #define upcri_startup_freeze(pargs, freezethread) \ _upcri_startup_freeze(pargs, freezethread UPCRI_PT_PASS) static void _upcri_startup_freeze(struct upcri_spawn_arg *pargs, upcr_thread_t freezethread UPCRI_PT_ARG) { pid_t pid; upcri_sighandlerfn_t oldhandler; pid = getpid(); if (freezethread >= upcr_threads()) freezethread = 0; oldhandler = (upcri_sighandlerfn_t)signal(UPCRI_UNFREEZE_SIGNAL, upcri_unfreezeHandler); if (pargs->mythread == freezethread) { bupc_frozen = 1; printf( "************************************************************************\n" "************* Freezing UPC runtime for debugging *******************\n" "************************************************************************\n" "Thread %d (pid %d on %s) is frozen: attach a debugger to it and set\n" " the 'bupc_frozen' variable to 0 to continue, or send a " UPCRI_UNFREEZE_SIGNAL_STR ".\n" " - To debug additional UPC threads, attach to them before unfreezing thread %d\n", freezethread, (int)pid, gasnett_gethostname(), freezethread); if (upcri_main_name != NULL) { printf( " - Note: if you wish to set a breakpoint at 'main', use '%s' instead\n", upcri_main_name); } fflush(stdout); } UPCRI_SINGLE_BARRIER_WITHCODE(sleep(1)); if (pargs->mythread == freezethread) { while (bupc_frozen == 1) gasnett_sched_yield(); /* yield CPU to speed debugger operation */ } else { printf("Thread %d (pid %d on %s) waiting for thread %d\n", pargs->mythread, (int)pid, gasnett_gethostname(), freezethread); fflush(stdout); sleep(1); } UPCRI_SINGLE_BARRIER(); signal(UPCRI_UNFREEZE_SIGNAL, oldhandler); } void upcri_startup_messages(void) { if (upcri_mynode == 0) { char tmp[1024]; const char *warning = gasnett_performance_warning_str(); strcpy(tmp,warning); #ifdef UPCRI_GASP strcat(tmp," GASP performance instrumentation\n"); #endif if (*tmp) { if (!upcr_getenv("UPC_NO_WARN")) { fprintf(stderr, "-----------------------------------------------------------------------\n" " WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARNING\n" "\n" " This application was built from a Berkeley UPC installation that\n" " was configured and built with these optional features enabled:\n" "%s" " This usually has a SERIOUS impact on performance, so you should NOT\n" " trust any performance numbers reported in this program run!!!\n" "\n" " To suppress this message, pass '-quiet' to the upcrun command or set\n" " the UPC_NO_WARN or UPC_QUIET environment variables.\n" "-----------------------------------------------------------------------\n" ,tmp); fflush(stderr); } } #ifdef GASNET_SEGMENT_EVERYTHING fprintf(stderr, "-----------------------------------------------------------------------\n" " WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARNING\n" "\n" " This application was built from a Berkeley UPC installation that\n" " was configured with --enable-segment-everything.\n" " This GASNet segment mode is currently not supported by Berkeley UPC.\n" " Correct operation of this application is not guaranteed.\n" "\n" " To suppress this message, pass '-quiet' to the upcrun command or set\n" " the UPC_NO_WARN or UPC_QUIET environment variables.\n" "-----------------------------------------------------------------------\n" ); fflush(stderr); #endif } UPCRI_SINGLE_BARRIER_NOTHR(); { int threadwidth = ((int)log10(MAX(1,upcri_1stthread(upcri_nodes-1))))+1; int hthreadwidth = ((int)log10(MAX(1,upcr_threads()-1)))+1; int processwidth = ((int)log10(upcri_nodes))+1; unsigned long mypid = (unsigned long) getpid(); #if UPCRI_USING_PSHM char *sninfo; { const char fmt[] = "pshm node %*d of %*d, "; gex_Rank_t nbrhd_set_size, nbrhd_set_rank; gex_System_QueryMyPosition(&nbrhd_set_size, &nbrhd_set_rank, NULL, NULL); int snwidth = 1 + (int)log10(MAX(1,nbrhd_set_size-1)); int len = 2*snwidth + (sizeof(fmt) - 6); sninfo = UPCRI_XMALLOC(char, len); snprintf(sninfo, len, fmt, snwidth, nbrhd_set_rank, snwidth, nbrhd_set_size); } #else char sninfo[] = ""; #endif /* DOB: this is written carefully to align the messages in a readable way and present the information in order of relevance. Number of extraneous characters should be kept to an absolute minimum so that lines will fit on the users screen without wrapping. Both of these properties are especially important for readability when there are many processes. */ if (upcri_mypthreads() == 1) { printf("UPCR: UPC thread %*i of %i on %s (%sprocess %*d of %*d, pid=%lu)\n", threadwidth, (int)upcri_1stthread(upcri_mynode), (int)upcr_threads(), gasnett_gethostname(), sninfo, processwidth, (int)upcri_mynode, processwidth, (int)upcri_nodes, (unsigned long)mypid); } else { printf("UPCR: UPC threads %*i..%*i of %i on %s (%sprocess %*d of %*d, pid=%lu)\n", threadwidth, (int)upcri_1stthread(upcri_mynode), hthreadwidth, (int)(upcri_1stthread(upcri_mynode) + upcri_mypthreads() - 1), (int)upcr_threads(), gasnett_gethostname(), sninfo, processwidth, (int)upcri_mynode, processwidth, (int)upcri_nodes, (unsigned long)mypid); } fflush(stdout); } sleep(1); UPCRI_SINGLE_BARRIER_NOTHR(); } #ifdef UPCRI_UPC_PTHREADS /* Parses UPC_PTHREADS_MAP string, returning # of threads in map. * If 'fill' set, also fills in mappings for UPC threads */ static int parse_pthread_map(char *mapstr, int fill) { char * p = mapstr; int nodecount = 0; int threadcount = 0; int i; /* count number of entries in map: must equal # of nodes */ while (isspace((int)*p)) p++; while (*p) { int pthreads = atoi(p); if (!pthreads) upcri_err("invalid format for UPC_PTHREADS_MAP: '%s'", mapstr); if (pthreads > UPCR_MAX_PTHREADS) upcri_err( "value %d in UPC_PTHREADS_MAP (%s) > UPCR_MAX_PTHREADS (%d). " "To enable larger pthread counts on most systems, rebuild UPCR with: configure --with-max-pthreads-per-node=N", pthreads, mapstr, UPCR_MAX_PTHREADS); if (fill) { upcri_node2pthreads[nodecount] = pthreads; upcri_node1stthread[nodecount] = threadcount; } for (i = 0; i < pthreads; i++) { if (fill) { upcri_thread2node[threadcount] = nodecount; upcri_thread2pthread[threadcount] = i; } threadcount++; } nodecount++; while (*p && !isspace((int)*p)) /* skip to next number */ p++; while (*p && isspace((int)*p)) p++; } if (nodecount != upcri_nodes) { upcri_err( "# of entries in UPC_PTHREADS_MAP ('%s') != number of processes (%d)", mapstr, (int)upcri_nodes); } return threadcount; } #endif static void get_thread_info(upcr_thread_t static_threadcnt, upcr_thread_t default_pthreads_per_proc) { #if UPCRI_UPC_PTHREADS char *str; upcr_thread_t i; #endif #if UPCRI_UPC_PTHREADS /* figure out how many UPC threads exist in the job */ if ( (str = gasnett_getenv_withdefault("UPC_PTHREADS_MAP",NULL)) != NULL) { upcri_threads = parse_pthread_map(str, 0); upcri_thread2node = UPCRI_XMALLOC_EARLY(gex_Rank_t, upcri_threads); upcri_thread2pthread = UPCRI_XMALLOC_EARLY(upcri_pthread_t, upcri_threads); upcri_node2pthreads = UPCRI_XMALLOC_EARLY(upcri_pthread_t, upcri_nodes); upcri_node1stthread = UPCRI_XMALLOC_EARLY(upcri_pthread_t, upcri_nodes+1); parse_pthread_map(str, 1); /* fill in thread mappings */ } else { upcri_pthread_t pthreads_per = gasnett_getenv_int_withdefault("UPC_PTHREADS_PER_PROC",default_pthreads_per_proc,0); if (pthreads_per <= 0) upcri_err("Illegal value for UPC_PTHREADS_PER_PROC: %d", pthreads_per); if (pthreads_per > UPCR_MAX_PTHREADS) upcri_err("UPC_PTHREADS_PER_PROC (%d) > UPCR_MAX_PTHREADS (%d). " "To enable larger pthread counts on most systems, rebuild UPCR with: configure --with-max-pthreads-per-node=N", pthreads_per, UPCR_MAX_PTHREADS); upcri_threads = upcri_nodes * pthreads_per; upcri_thread2node = UPCRI_XMALLOC_EARLY(gex_Rank_t, upcri_threads); upcri_thread2pthread = UPCRI_XMALLOC_EARLY(upcri_pthread_t, upcri_threads); for (i = 0; i < upcri_threads; i++) { upcri_thread2node[i] = i / pthreads_per; upcri_thread2pthread[i] = i % pthreads_per; } upcri_node2pthreads = UPCRI_XMALLOC_EARLY(upcri_pthread_t, upcri_nodes); upcri_node1stthread = UPCRI_XMALLOC_EARLY(upcri_pthread_t, upcri_nodes+1); for (i = 0; i < upcri_nodes; i++) { upcri_node2pthreads[i] = pthreads_per; upcri_node1stthread[i] = i * pthreads_per; } upcri_node1stthread[upcri_nodes] = upcri_threads; } upcri_mypthread_cnt = upcri_node2pthreads[upcri_mynode]; #else upcri_threads = upcri_nodes; #endif if (static_threadcnt != 0 && static_threadcnt != upcri_threads) { upcri_err("program was compiled with %i static threads, " "but executed with %i threads.", static_threadcnt, upcri_threads); } if ((uint64_t)upcri_nodes > (uint64_t)UPCR_MAXNODES) upcri_err("too many runtime nodes! UPCR_MAXNODES=%llu, node count=%llu", (unsigned long long)UPCR_MAXNODES, (unsigned long long)upcri_nodes); if (upcri_threads > UPCR_MAX_THREADS) upcri_err("too many runtime threads! UPCR_MAX_THREADS=%llu, thread count=%llu", (unsigned long long)UPCR_MAX_THREADS, (unsigned long long)upcri_threads); } /**************************************************************************** * Low-level initialization functions ****************************************************************************/ /* ensures that startup functions called in correct order */ upcri_startup_t upcri_startup_lvl = upcri_startup_init; void upcr_startup_init(int *pargc, char ***pargv, upcr_thread_t static_threadcnt, upcr_thread_t default_pthreads_per_proc, const char * main_name) { int err; if (upcri_startup_lvl >= upcri_startup_done) return; if (upcri_startup_lvl != upcri_startup_init) upcri_early_err("upcr_startup_init called while upcri_startup_lvl=%d", upcri_startup_lvl); upcri_startup_lvl++; gasnett_maximize_rlimits(); if ( (err = gex_Client_Init(&upcri_client, &upcri_ep, &upcri_tm, "UPCR", pargc, pargv, GEX_FLAG_USES_GASNET1))) upcri_gaserr(err, "gex_Client_Init failed!"); #ifdef GASNET_STATS upcri_stats_enabled = GASNETT_STATS_INIT(upcri_stats_finish); #endif #if UPCR_DEBUGMALLOC upcri_debug_malloc = gasnett_getenv_yesno_withdefault("UPC_DEBUG_MALLOC",1); #endif upcri_mynode = gex_TM_QueryRank(upcri_tm); upcri_nodes = gex_TM_QuerySize(upcri_tm); get_thread_info( (static_threadcnt <= 0 ? 0 : static_threadcnt), default_pthreads_per_proc); #if 0 && UPCR_EXTEND_PRINTF register_printf_function('S', print_shared, print_shared_arginfo); register_printf_function('P', print_pshared, print_pshared_arginfo); #endif /* UPCR_EXTEND_PRINTF */ upcri_main_name = main_name; } #if UPCR_DEBUG static upcr_pshared_ptr_t upcri_exitconf = UPCR_NULL_PSHARED; #endif static int upcri_collective_exit = 0; static int upcri_gasnet_exit_called = 0; static int upcri_use_gasnet_exit = 1; // use gasnet_exit for collective exits? static int upcri_atexitcode = 0; /* DOB: the correct default exit value is 0 */ /* the means by which the parallel job spawner merges exit codes from * different nodes is unspecified and system-specific, but often the merge * algorithm ignores zero exit codes in favor of non-zero ones if they are * present. Therefore, when the client has not provided us with an explicit * exit code (signal handler) or we have no way to get it (atexit), we * default to zero and therefore give priority to remote nodes that may have * a "real" exit code from the client, especially in the case of non-collective exits */ /* * Does gasnet_exit if it hasn't been done already by exit() * time (i.e. if upcr_startup_spawn() called with NULL main_function). The * user should have called upcr_finalize() if this is the case. */ static void upcri_atexit_handler(void) { gasnett_flush_streams(); if (!upcri_gasnet_exit_called && upcri_use_gasnet_exit) { upcri_gasnet_exit_called = 1; gasnet_exit(upcri_atexitcode); } } /* SIGQUIT handler that catches remote exits */ void upcri_remoteExitSignalHandler(int sig) { if (sig != SIGQUIT) upcri_err("upcri_remoteExitSignalHandler got an unexpected signal %i", sig); else { gasnett_flush_streams(); /* this may be dangerous here in a signal context... */ upcri_gasnet_exit_called = 1; gasnet_exit(upcri_atexitcode); } } #ifdef GASNET_SEGMENT_EVERYTHING struct upcri_seginfo { void *addr; uintptr_t size; }; struct upcri_seginfo *upcri_seginfos; #if UPCRI_UPC_PTHREADS struct upcri_seginfo **upcri_seginfo_addrs; static gasnett_atomic_t upcri_seginfos_rcvd = gasnett_atomic_init(0); void upcri_SRQ_seginfo(gex_Token_t token, void *info_addr, void *seg_addr, void *seg_size) { gex_Token_Info_t info; gex_Token_Info(token, &info, GEX_TI_SRCRANK); gex_Rank_t srcnode = info.gex_srcrank; upcri_assert(srcnode != upcri_mynode); upcri_seginfo_addrs[(int)srcnode] = info_addr; upcri_seginfos[(int)srcnode].addr = seg_addr; upcri_seginfos[(int)srcnode].size = (uintptr_t)seg_size; gasnett_atomic_increment(&upcri_seginfos_rcvd, GASNETT_ATOMIC_REL); } #endif #endif #if UPCRI_SHARED_THREADS /* Used in upcr_startup_attach() */ static int upcri_cmp_thread_heaps(const void *a, const void *b) { const uintptr_t addr_a = upcri_thread2local[*(const int *)a]; const uintptr_t addr_b = upcri_thread2local[*(const int *)b]; /* 1 if (a > b), -1 if (a < b), and 0 if equal */ return (addr_a > addr_b) - (addr_a < addr_b); } #endif void upcr_startup_attach(uintptr_t default_shared_size, uintptr_t default_shared_offset, int flags) { char buf[64]; uint64_t perthread_req, node_req, heapoffset; gasnet_seginfo_t *seginfos; int err; gex_Rank_t n; int host_threads; int thread = 0; int t; if (upcri_startup_lvl >= upcri_startup_done) return; if (upcri_startup_lvl != upcri_startup_attach) upcri_err("upcr_startup_attach called while upcri_startup_lvl=%d", upcri_startup_lvl); upcri_startup_lvl++; #if UPCR_USING_LINKADDRS { /* Make sure that the compiler doesn't optimize away the check. */ void * volatile p = UPCRL_shared_end; if (!p) upcri_err("end of shared link segment (UPCRL_shared_end) is NULL"); } upcri_linksegstart = (uintptr_t)UPCRL_shared_begin; #endif /* get shared heap size and offset */ if (default_shared_size % UPCR_PAGESIZE) upcri_err( "upcr_startup_attach: default_shared_size %lu not a multiple of UPCR_PAGESIZE", (unsigned long)default_shared_size); if (default_shared_offset % UPCR_PAGESIZE) upcri_err( "upcr_startup_attach: default_shared_offset %lu not a multiple of UPCR_PAGESIZE", (unsigned long)default_shared_offset); /* validate shared heap size parameter */ perthread_req = (flags & UPCR_ATTACH_ENV_OVERRIDE) ? gasnett_getenv_int_withdefault("UPC_SHARED_HEAP_SIZE", default_shared_size, MEGABYTE) : default_shared_size; if (perthread_req < 1024) upcri_err("illegal size for UPC_SHARED_HEAP_SIZE: %s", gasnett_format_number(perthread_req, buf, sizeof(buf), 1)); #if UPCRI_PACKED_SPTR && (UPCRI_ADDR_BITS < 8*SIZEOF_VOID_P) if (perthread_req > ((uintptr_t)1 << UPCRI_ADDR_BITS)) upcri_err("out-of-range size for UPC_SHARED_HEAP_SIZE: %s (sptr representation)", gasnett_format_number(perthread_req, buf, sizeof(buf), 1)); #else if (((uintptr_t)perthread_req) != perthread_req) /* check for size > 32 bits on ILP32 */ upcri_err("out-of-range size for UPC_SHARED_HEAP_SIZE: %s (32-bit ABI)", gasnett_format_number(perthread_req, buf, sizeof(buf), 1)); #endif /* validate shared heap offset parameter */ heapoffset = (flags & UPCR_ATTACH_ENV_OVERRIDE) ? gasnett_getenv_int_withdefault("UPC_SHARED_HEAP_OFFSET", default_shared_offset, MEGABYTE) : default_shared_offset; /* 0 is OK */ if (((uintptr_t)heapoffset) != heapoffset) /* check for size > 32 bits on ILP32 */ upcri_err("out-of-range size for UPC_SHARED_HEAP_OFFSET: %s", gasnett_format_number(heapoffset, buf, sizeof(buf), 1)); if (perthread_req == 0) upcri_err("shared heap size must be > 0"); perthread_req = UPCRI_PAGEALIGNUP(perthread_req); { uintptr_t minsz = MAX(UPCR_PAGESIZE*2, 64*1024); /* On SX-6 pages are 16 MB, and we need at least two pages, * or 64KB (for upcr shared metadata), whichever is larger */ if (perthread_req < minsz ) { upcri_err("illegal value for UPC_SHARED_HEAP_SIZE: %lu " "(must be at least %i KB on this system)", (unsigned long)perthread_req, (int)(minsz/1024)); } } /* gasnet returns an error if we ask for a larger segment than can be * provided, so limit the size of the segment we request to this node's maximum * segment size. * - don't use gasnet_getMaxGlobalSegmentSize, as other nodes may have fewer * pthreads then this one, and thus be OK with a smaller segment. */ node_req = gasnet_getMaxLocalSegmentSize(); if (node_req > perthread_req * upcri_mypthreads()) node_req = perthread_req * upcri_mypthreads(); upcri_assert((uintptr_t)node_req == node_req); /* check for overflow */ #if UPCRI_SYMMETRIC_SEGMENTS /* * Extra work to try to obtain a power-of-two segment if we need one. */ if (UPCRL_segsym_pow2_opt && !UPCRI_IS_POWER_OF_TWO(node_req)) { /* * We use Maxglobal here since symmetric segments are for smp * (all-local) conduits */ uint64_t node_req_upper, node_req_lower, tmp = node_req; int i; /* First try getting the next power of two */ for (i = 0; tmp; i++) { tmp >>= 1; } node_req_upper = 1ULL << i; node_req_lower = 1ULL << (i-1); while (node_req_lower > gasnet_getMaxLocalSegmentSize()) node_req_lower >>= 1; /* Warn if the user requested a non-default shared heap size. */ if (node_req_upper <= gasnet_getMaxLocalSegmentSize()) { upcri_warn("Node %i increasing requested per-node shared heap size (%llu MB)\n" " up to next power of two size: %llu MB", (int)upcri_mynode, (unsigned long long)((perthread_req * upcri_mypthreads())/(1024*1024)), (unsigned long long)(node_req_upper/(1024*1024))); node_req = node_req_upper; } else if (node_req_lower <= gasnet_getMaxLocalSegmentSize()) { upcri_warn("Node %i decreasing requested per-node shared heap size (%llu MB)\n" " down to largest available power of two size: %llu MB", (int)upcri_mynode, (unsigned long long)((perthread_req * upcri_mypthreads())/(1024*1024)), (unsigned long long)(node_req_lower/(1024*1024))); node_req = node_req_lower; } upcri_assert(UPCRI_IS_POWER_OF_TWO(node_req)); perthread_req = node_req / upcri_mypthreads(); upcri_assert(UPCRI_IS_POWER_OF_TWO(perthread_req)); } #endif /* Initial local heap size */ upcri_localheap_initsz = upcri_roundup_pagesz( gasnett_getenv_int_withdefault( "UPC_SHARED_LOCALHEAP_INITSZ", upcri_localheap_initsz, MEGABYTE)); /* Page-align initial global heap size */ upcri_globalheap_initsz = upcri_roundup_pagesz(upcri_globalheap_initsz); upcri_memcheck_all(); /* verify heap sanity */ #ifndef GASNET_SEGMENT_EVERYTHING if ((err = gex_Segment_Attach(&upcri_segment, upcri_tm, node_req))) upcri_gaserr(err, "gex_Segment_Attach failed!"); #endif // TODO: per-subsystem registration is now possible if ((err = gex_EP_RegisterHandlers( upcri_ep, upcri_get_handlertable(), upcri_get_handlertable_count()))) upcri_gaserr(err, "AM handler registration failed!"); UPCRI_SINGLE_BARRIER_NOTHR(); /* TODO: omit or move later (but before first AM) */ seginfos = UPCRI_XMALLOC(gasnet_seginfo_t, upcri_nodes); upcri_nodeinfo = UPCRI_XMALLOC(upcri_nodeinfo_t, upcri_nodes); #if ! UPCRI_SINGLE_ALIGNED_REGIONS upcri_thread2region = UPCRI_XMALLOC(uintptr_t, upcri_threads); upcri_thread2local = UPCRI_XCALLOC(uintptr_t, upcri_threads); #endif #if UPCR_USING_LINKADDRS && !UPCRI_SINGLE_ALIGNED_REGIONS upcri_linkoffset = UPCRI_XMALLOC(uintptr_t, upcri_threads); #endif #if UPCRI_UPC_PTHREADS upcri_pthreadtoinfo = UPCRI_XCALLOC(upcri_pthreadinfo_t *, upcri_mypthreads()); #endif /* Initialize GASNet-level collectives */ upcri_coll_init(); upcri_use_gasnet_exit = gasnett_getenv_yesno_withdefault("GASNET_CATCH_EXIT", 1); if (atexit(&upcri_atexit_handler)) upcri_err("atexit() failed!"); /* register a SIGQUIT handler, as recommended by the GASNet spec */ { upcri_sighandlerfn_t fpret = (upcri_sighandlerfn_t)signal(SIGQUIT, upcri_remoteExitSignalHandler); if (fpret == (upcri_sighandlerfn_t)SIG_ERR) upcri_err("Got a SIG_ERR while registering SIGQUIT handler"); } // Get Host and Nbdrh info used for various initialization purposes gex_System_QueryHostInfo(&upcri_my_host, &upcri_my_host_size, NULL); #if UPCRI_USING_PSHM gex_System_QueryNbrhdInfo(&upcri_my_nbrhd, &upcri_my_nbrhd_size, NULL); #endif // Build nodeinfo, required for bupc_thread_distance() // // TODO: BUPC_THREADS_NEAR never arises for the common case of PSHM and Nbrhd==host. // Even for job configs where it can arise, it's arguably not a useful // performance distinction. If we lumped that "same node, diff nbrhd" // relationship into BUPC_THREADS_VERYFAR, that would allow us to remove // the `host` field from upcri_nodeinfo, improving memory scalability. If // we wanted to take this a step further we could introduce an envvar to // declare that bupc_thread_distance() precision is unneeded, allowing us // to skip this table allocation entirely and the call could use a trivial // implementation that returns only BUPC_THREADS_SAME or // BUPC_THREADS_VERYFAR (for all unequal threads) { upcri_nodeinfo_t my_nodeinfo; #if UPCRI_USING_PSHM gex_Rank_t *nbrhd_ptr = &my_nodeinfo.nbrhd; #else gex_Rank_t *nbrhd_ptr = NULL; #endif gex_System_QueryMyPosition(NULL, nbrhd_ptr, NULL, &my_nodeinfo.host); gasnet_coll_gather_all(GASNET_TEAM_ALL, upcri_nodeinfo, &my_nodeinfo, sizeof(upcri_nodeinfo_t), GASNET_COLL_LOCAL|GASNET_COLL_IN_MYSYNC|GASNET_COLL_OUT_MYSYNC); } if (!upcr_getenv("UPC_QUIET")) upcri_startup_messages(); if ((gasnett_getenv_withdefault("UPC_FREEZE_EARLY", NULL)) != NULL) { upcri_startup_freeze_early(atoi(upcr_getenv("UPC_FREEZE_EARLY"))); } if (gasnet_getSegmentInfo(seginfos, upcri_nodes) != GASNET_OK) upcri_err("unable to get GASNet segment information: aborting"); #ifdef GASNET_SEGMENT_EVERYTHING /* Allocate a shared heap * TODO: Use valloc(), posix_memalign(), etc when available? * TODO: More friendly message on allocation failure. */ { uint8_t *heap_req = UPCRI_XMALLOC(uint8_t, node_req + UPCR_PAGESIZE); void *heap = (void*)UPCRI_PAGEALIGNUP(heap_req); struct upcri_seginfo *myseginfo; upcri_seginfos = UPCRI_XMALLOC(struct upcri_seginfo, upcri_nodes); myseginfo = upcri_seginfos + upcri_mynode; myseginfo->addr = heap; myseginfo->size = node_req; #if !UPCRI_UPC_PTHREADS /* Gather-all via the corresponding gasnet collective */ gasnet_coll_gather_all(GASNET_TEAM_ALL, upcri_seginfos, myseginfo, sizeof(struct upcri_seginfo), GASNET_COLL_LOCAL|GASNET_COLL_IN_MYSYNC|GASNET_COLL_OUT_MYSYNC); #else /* Gather-all via AM-based gather and PUT-based flat broadcast. We can't use gasnet_coll_gather_all() because the "images" support need for pthreads is no longer supported. Even when it was, this exchange occurs prior to the necessary initialization. TODO: can we use something more scalable? */ if (!upcri_mynode) upcri_seginfo_addrs = UPCRI_XMALLOC(struct upcri_seginfo *, upcri_nodes); UPCRI_SINGLE_BARRIER_NOTHR(); if (upcri_mynode) { /* Send my info to 0 */ gex_AM_RequestShort(upcri_tm, 0, UPCRI_HANDLER_ID(upcri_SRQ_seginfo), 0, UPCRI_SEND_PTR(upcri_seginfos), UPCRI_SEND_PTR(myseginfo->addr), UPCRI_SEND_PTR((void*)myseginfo->size)); } else { /* wait for all arrivals */ GASNET_BLOCKUNTIL((int)gasnett_atomic_read(&upcri_seginfos_rcvd, 0) == (upcri_nodes - 1)); /* Bcast, excluding self */ for (n = 1; n < upcri_nodes; ++n) { /* Bcast, excluding self */ gex_RMA_PutNBI(upcri_tm, n, upcri_seginfo_addrs[(int)n], upcri_seginfos, upcri_nodes*sizeof(struct upcri_seginfo), GEX_EVENT_DEFER, GEX_FLAG_PEER_NEVER_SELF); } upcri_free(upcri_seginfo_addrs); gex_NBI_Wait(GEX_EC_PUT,0); } UPCRI_SINGLE_BARRIER_NOTHR(); #endif for (n = 0; n < upcri_nodes; ++n) { upcri_assert(!((uintptr_t)upcri_seginfos[(int)n].addr % UPCR_PAGESIZE)); upcri_assert(!(upcri_seginfos[(int)n].size % UPCR_PAGESIZE)); seginfos[(int)n].addr = upcri_seginfos[(int)n].addr; seginfos[(int)n].size = upcri_seginfos[(int)n].size; } upcri_free(upcri_seginfos); } #endif { gex_Rank_t lead_node = 0; int local_cpus = gasnett_cpu_count(); int politedefault; #if UPCRI_UPC_PTHREADS int threads_serialized = 0; #if PLATFORM_OS_OPENBSD /* May be running "uthreads" which serializes all threads to a single core */ if (upcri_mypthreads() > 1) { pthread_attr_t attr; pthread_attr_init(&attr); threads_serialized = (0 != pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM)); } #endif #endif host_threads = 0; for (gex_Rank_t i = 0; i < upcri_my_host_size; ++i) { n = upcri_my_host[i].gex_jobrank; if (!host_threads) lead_node = n; host_threads += upcri_pthreads(n); } politedefault = (local_cpus > 0) && (host_threads > local_cpus); #if UPCRI_UPC_PTHREADS politedefault |= threads_serialized; #endif upcri_polite_wait = gasnett_getenv_yesno_withdefault("UPC_POLITE_SYNC",politedefault); if (politedefault && (upcri_mynode == lead_node)) { #if UPCRI_UPC_PTHREADS if (threads_serialized) fprintf(stderr,"WARNING: Process(es) on host %s running more threads (%i) than platform's supported concurrency\n", gasnett_gethostname(), upcri_mypthreads()); #endif if ((local_cpus > 0) && (host_threads > local_cpus)) fprintf(stderr,"WARNING: Host %s running more threads (%i) than there are physical CPU's (%i)\n", gasnett_gethostname(), host_threads, local_cpus); if (upcri_polite_wait) { fprintf(stderr," enabling \"polite\", low-performance synchronization algorithms\n"); } else { fprintf(stderr," but setting UPC_POLITE_SYNC=\"%s\" in your environment has\n" " disabled \"polite\", low-performance synchronization algorithms\n" " Results of this run are not suitable for benchmarking\n", gasnet_getenv("UPC_POLITE_SYNC")); } } else if (upcri_polite_wait && upcri_mynode == 0) { fprintf(stderr,"WARNING: UPC_POLITE_SYNC=\"%s\" is set in your environment\n" " enabling \"polite\", low-performance synchronization algorithms\n", gasnet_getenv("UPC_POLITE_SYNC")); } fflush(stderr); } gasnet_set_waitmode( upcri_polite_wait ? GASNET_WAIT_BLOCK : GASNET_WAIT_SPIN ); if (gasnett_getenv_yesno_withdefault("UPC_REQUIRE_SHARED_SIZE", flags & UPCR_ATTACH_REQUIRE_SIZE)) { for (n = 0; n < upcri_nodes; n++) { if (seginfos[n].size < (perthread_req * upcri_pthreads(n)) ) { if (upcri_mynode == 0) { upcri_err( "Unable to allocate sufficient shared memory: aborting.\n" "\n" "This application requested %lu MB of shared memory per thread (starting\n" "at a minimum offset of %lu MB from the start of the regular heap).\n" "However, node %d was only able to allocate %lu MB of shared memory.\n" "\n" "The default amount of shared memory requested at startup is controlled\n" "by the 'shared_heap' parameter in the global upcc.conf file or in your\n" "$HOME/.upccrc file, or by the '-shared-heap' option to the upcc command.\n" "\n" "To override the default, you may use the '-shared-heap' option to upcrun,\n" "or set the UPC_SHARED_HEAP_SIZE or UPC_SHARED_HEAP_OFFSET environment variables.\n", (unsigned long)perthread_req/MEGABYTE, (unsigned long)heapoffset/MEGABYTE, (int)n, (unsigned long)(seginfos[n].size/MEGABYTE)); } // other nodes will see non-collective exit no later than next barrier } } upcri_perthread_segsize = upcri_rounddown_pagesz(perthread_req); } else { /* use smallest region's size (per thread) for all */ uintptr_t smallest_perthread = seginfos[0].size / upcri_pthreads(0); gex_Rank_t smallest_node = 0; for (n = 1; n < upcri_nodes; n++) { uintptr_t perthread = seginfos[n].size / upcri_pthreads(n); if (perthread < smallest_perthread) { smallest_perthread = perthread; smallest_node = n; } } if (smallest_perthread < perthread_req) { if (gasnett_getenv_yesno_withdefault("UPC_SIZE_WARN",flags & UPCR_ATTACH_SIZE_WARN)) { if (upcri_mynode == smallest_node) { upcri_warn( "Requested shared memory (%lu MB) > available (%lu MB) " "on node %d (%s): using %lu MB per thread instead", (unsigned long)(perthread_req/MEGABYTE * upcri_pthreads(smallest_node)), (unsigned long)(smallest_perthread/MEGABYTE * upcri_pthreads(smallest_node)), (int)smallest_node, gasnett_gethostname(), smallest_perthread/MEGABYTE); } } } upcri_perthread_segsize = upcri_rounddown_pagesz(smallest_perthread); } #if !UPCRI_UPC_PTHREADS upcri_myregion_single = (uintptr_t)seginfos[upcri_mynode].addr; #if UPCR_USING_LINKADDRS upcri_linkoffset_single = (upcri_myregion_single + UPCR_PAGESIZE) - upcri_linksegstart; #endif #endif for (n = 0; n < upcri_nodes; n++) { #if UPCRI_SINGLE_ALIGNED_REGIONS #if 0 printf("T%d: Thread %d's region starts at %p\n", upcri_mynode, n, seginfos[n].addr); #endif if (seginfos[n].addr != seginfos[0].addr) { upcri_err("got unaligned segments with GASNET_ALIGNED_SEGMENTS"); } #else /* !UPCRI_SINGLE_ALIGNED_REGIONS */ #if UPCRI_SYMMETRIC_SEGMENTS if (n > 1) { int p0 = n-2, p1 = n-1; if ((uintptr_t)seginfos[n].addr - (uintptr_t)seginfos[p1].addr != (uintptr_t)seginfos[p1].addr - (uintptr_t)seginfos[p0].addr) upcri_err( "Nodes %d/%d and %d/%d have not allocated " "shared heaps symmetrically\n", n, p1, p1, p0); } #endif for (t = 0; t < upcri_pthreads(n); t++) { uintptr_t region = ((uintptr_t)seginfos[n].addr) + (t * upcri_perthread_segsize); upcri_thread2region[thread] = region; #if 0 printf("Thread %d's segment is %p -> %p\n", thread-1, (void*)upcri_thread2region[thread-1], (void*)(upcri_thread2region[thread-1] + upcri_perthread_segsize)); #endif #if UPCRI_SYMMETRIC_SEGMENTS if (t > 1) { int t2 = t, t1 = t-1, t0 = t-2; if (upcri_thread2region[t2] - upcri_thread2region[t1] != upcri_thread2region[t1] - upcri_thread2region[t0]) upcri_err("Pthread segments are not symmetric"); } #endif #if UPCR_USING_LINKADDRS upcri_linkoffset[thread] = (region + UPCR_PAGESIZE) - upcri_linksegstart; #if 0 printf("Thread %d's upcri_linkoffset=%p (link seg=%p)\n", thread, upcri_linkoffset[thread], upcri_linksegstart); #endif #endif thread++; } #endif } #if ! UPCRI_SINGLE_ALIGNED_REGIONS /* set up upcri_thread2local to return NULL for remote regions, or * address if local. */ #if UPCRI_UPC_PTHREADS { int maxthread; t = upcri_1stthread(upcri_mynode); maxthread = t + upcri_mypthreads() - 1; for ( ; t <= maxthread; ++t) { upcri_thread2local[t] = 0 UPCRI_PLUS_REMOTE_OFFSET(t); } #if UPCRI_USING_PSHM for (gex_Rank_t i = 0; i < upcri_my_nbrhd_size; ++i) { n = upcri_my_nbrhd[i].gex_jobrank; if (n != upcri_mynode) { void *owner_addr; void *local_addr; gex_Event_Wait( gex_EP_QueryBoundSegmentNB(upcri_tm, n, &owner_addr, &local_addr, NULL, 0) ); uintptr_t offset = (uintptr_t)local_addr - (uintptr_t)owner_addr; int fthr = upcri_1stthread(n); maxthread = fthr + upcri_pthreads(n) - 1; for (t = fthr; t <= maxthread; ++t) { upcri_thread2local[t] = offset UPCRI_PLUS_REMOTE_OFFSET(t); } } } #endif } #elif UPCRI_SYMMETRIC_SEGMENTS /* configs with load/store access to remote shared memory - global pointers, etc. */ for (n = 0; n < upcri_nodes; n++) { upcri_thread2local[n] = 0 UPCRI_PLUS_REMOTE_OFFSET(n); } #else /* !UPCRI_SYMMETRIC_SEGMENTS && !UPCRI_UPC_PTHREADS */ /* unaligned segment platforms with exactly one thread per node */ upcri_thread2local[upcri_mynode] = 0 UPCRI_PLUS_MY_OFFSET; #if UPCRI_USING_PSHM for (gex_Rank_t i = 0; i < upcri_my_nbrhd_size; ++i) { n = upcri_my_nbrhd[i].gex_jobrank; if (n != upcri_mynode) { void *owner_addr; void *local_addr; gex_Event_Wait( gex_EP_QueryBoundSegmentNB(upcri_tm, n, &owner_addr, &local_addr, NULL, 0) ); upcri_thread2local[n] = (uintptr_t)local_addr - (uintptr_t)owner_addr UPCRI_PLUS_REMOTE_OFFSET(n); } } #endif #endif #if UPCRI_SYMMETRIC_SEGMENTS upcri_segsym_base = (char *) upcri_thread2local[0]; /* Simply use region between 1 and 0, since we've already * checked above that all regions are symmetric */ if (upcri_threads > 1) upcri_segsym_region_size = (ptrdiff_t) (upcri_thread2local[1] - upcri_thread2local[0]); else upcri_segsym_region_size = upcri_perthread_segsize; upcri_segsym_size = (ptrdiff_t) (upcri_segsym_region_size * upcri_threads); upcri_segsym_end = upcri_segsym_base + upcri_segsym_size; #if 0 printf("segsym %p .. %p of size %ld\n", upcri_segsym_base, upcri_segsym_end, (u_long) upcri_segsym_size); #endif /* * Extra checks if our power of two optimization is on */ if (UPCRL_segsym_pow2_opt) { int i; int bits = sizeof(uintptr_t)*8; if (!UPCRI_IS_POWER_OF_TWO(upcri_threads)) upcri_err("Symmetric Pointer support compiled with Power-of-two " "optimizations and executed with a non power-of-two " "number of threads\n"); if (!UPCRI_IS_POWER_OF_TWO(upcri_segsym_size)) upcri_err("Symmetric Pointer support compiled with Power-of-two " "optimizations and the allocated symmetric segment is " "not a power-of-two (%ld bytes or 0x%lx)\n", upcri_segsym_size, upcri_segsym_size); upcri_assert(UPCRI_IS_POWER_OF_TWO(upcri_segsym_region_size)); upcri_segsym_size_mask = upcri_segsym_size - 1; upcri_segsym_region_size_mask = upcri_segsym_region_size - 1; for (i=0; imypthread; #endif mycpu = local_cpus ? (local_rank % local_cpus) : local_rank; UPCRI_TRACE_PRINTF(("Binding UPC thread %d to CPU %d", (int)pargs->mythread, mycpu)); gasnett_set_affinity(mycpu); } #if UPCRI_UPC_PTHREADS /* Allocation done by upcc-generated linker function, since size of the * TLD data can only be known at application link time */ _upcr_pthreadinfo = upcri_linkergenerated_tld_init(); _upcr_pthreadinfo->mythread = pargs->mythread; _upcr_pthreadinfo->mypthread = pargs->mypthread; _upcr_pthreadinfo->mygasnet_tinfo = GASNET_GET_THREADINFO(); _upcr_pthreadinfo->myregion = upcri_thread2region[pargs->mythread]; #if UPCR_USING_LINKADDRS _upcr_pthreadinfo->link_offset = upcri_linkoffset[pargs->mythread]; #endif gasnett_threadkey_set(upcri_pthread_key, _upcr_pthreadinfo); upcri_pthreadtoinfo[pargs->mypthread] = _upcr_pthreadinfo; memset(upcri_auxdata(), 0, sizeof(*upcri_auxdata())); /* zero-init all aux tld */ upcri_auxdata()->barrier_info.barrier_args = upcri_barrier_args + pargs->mypthread; upcri_pthread_barrier(); #endif #if UPCR_DEBUG upcri_auxdata()->stack_info.hot = (char *)&exitval; upcri_auxdata()->stack_info.cold = (char *)&exitval; #endif upcri_auxdata()->cpu = mycpu; /* Per-thread initialization of collectives */ upcri_coll_init_thread(); #ifndef UPCRI_SUPPRESS_SIGFPE #if PLATFORM_ARCH_ALPHA || PLATFORM_ARCH_CRAYT3E || PLATFORM_ARCH_CRAYX1 /* DOB: Setup a trap handler for SIGFPE (floating point exception) to keep it from crashing us The Alpha CPU delivers a floating point exception leading to SIGFPE whenever a floating point exception occurs, and it's up to the OS to clean things up - but it can only properly do so if we compiled the code with -ieee (Compaq C), which makes all FP ops restartable but also lowers performance (more info, see man ieee(3)). If we compile with -ieee, the OS ensures we get all the right IEEE FP results, even with SIGFPE ignored (but we take a performance hit). Without -ieee, the SIGFPE is usually fatal because the results are not well-defined. However, experiments (on a 21264A) indicate that if you ignore the signal, exceptional operations still correctly produce NaN's and Infinity's, and the only thing we lose is denormalized IEEE FP results instead become zeroes (which we can live with by default, and users who care can add -ieee to the C flags). Same issue on Cray X1 (bug 1840) */ #define UPCRI_SUPPRESS_SIGFPE 1 #else #define UPCRI_SUPPRESS_SIGFPE 0 #endif #endif if (gasnett_getenv_yesno_withdefault("UPC_SUPPRESS_SIGFPE",UPCRI_SUPPRESS_SIGFPE)) { if (signal(SIGFPE, SIG_IGN) == SIG_ERR) upcri_errno("failed to signal(SIGFPE, SIG_IGN)"); } /* * Layout of each thread's portion of shared memory: * 1) First page unused: catch NULL shared ptr refs in DEBUG mode * 2) Static data: * 3) Cache blocks * 4) Heap area (for both local/global shared heaps) */ myaddr = upcri_myregion(); heapoffset = UPCR_PAGESIZE + pargs->static_data_size + upcri_cache_size; upcri_perthread_heapsize = upcri_perthread_segsize - heapoffset; /* Validate heap size */ uintptr_t heap_initsz = upcri_localheap_initsz + upcri_globalheap_initsz; if (upcri_perthread_heapsize <= heap_initsz) { if(!upcr_mythread()) { upcri_err("Per-thread shared heap size of %lu MB is too small by at least %lu MB.\n", (unsigned long)(upcri_perthread_segsize/MEGABYTE), (unsigned long)(MAX(1,(heap_initsz - upcri_perthread_heapsize)/MEGABYTE))); } // other UPC threads will see non-collective exit no later than next barrier } /* Hook for arbitrary per-pthread initializations */ if (pargs->spawnfuncs->per_pthread_init != NULL) pargs->spawnfuncs->per_pthread_init(); #if UPCR_USING_CACHING /* initialize cache */ pargs->spawnfuncs->cache_init( (void *)(myaddr + UPCR_PAGESIZE + pargs->static_data_size), upcri_cache_size); #endif /* run shared heap initialization function */ pargs->spawnfuncs->heap_init( (void *)(myaddr+heapoffset), upcri_perthread_heapsize); /* run static data initialization function */ pargs->spawnfuncs->static_init( (void *)(myaddr + UPCR_PAGESIZE), pargs->static_data_size); /* Ensure logging of the following operations are attributed correctly */ UPCR_SET_SRCPOS("_STARTUP",0); /* init upc_lock library */ upcri_locksystem_init(); upcri_rand_init(); upcri_clock_init(); upcri_init_extern(); #if UPCR_DEBUG /* metadata used to provide extra barrier safety at the final barrier, * to help catch buggy programs */ { upcr_shared_ptr_t tmp = upcr_all_alloc(1, sizeof(int)*upcr_threads()); upcri_exitconf = upcr_shared_to_pshared(tmp); if (upcr_mythread() == 0) { memset(upcr_pshared_to_local(upcri_exitconf), -1, sizeof(int)*upcr_threads()); } } #endif #if GASNET_TRACE /* Honor GASNet's tracelocal setting in our own local tracing */ upcri_trace_suppresslocal = !gasnett_getenv_yesno_withdefault("GASNET_TRACELOCAL",1); /* Generate TRACE_MAGIC if tracing is enabled (at build time) and requested (at run time) */ { const char *trace_id; char tmp[255]; const char *hostname = gasnett_gethostname(); trace_id = upcr_getenv("UPC_TRACE_ID"); if (!trace_id) { /* no id available from upcrun, so have thread 0 generate one based on hostname and pid */ uint64_t id = 0; if (upcr_mythread() == 0) { int i; id = getpid(); for (i = 0; hostname[i]; i++) { id = id ^ (((uint64_t)(uint8_t)hostname[i]) << (8*(i % 8))); } } upcri_broadcast(0, &id, 8); snprintf(tmp, sizeof(tmp), "%llu", (unsigned long long)id); trace_id = tmp; } GASNETT_TRACE_PRINTF_FORCE( "GASNET_TRACE_MAGIC: I am thread %d of %d, on node %d of %d (%s) in job ", (int)upcr_mythread(), (int)upcr_threads(), (int)upcri_mynode, (int)upcri_nodes, hostname, trace_id); } #endif if (!upcri_mypthread()) upcri_memcheck_all(); /* verify heap sanity */ #if PLATFORM_COMPILER_XLC printf(""); /* bug1506: workaround for thread-safety bug in atof */ #endif #ifdef PLATFORM_OS_CYGWIN /* bug 2438/1847: cygwin's threaded I/O library is not reliably synchronized, so try to reduce the incidence of I/O-related hangs by invoking some relevant I/O early from a single thread. Note the I/O library appears to be lazily synchronized, so this hack is ineffective before the first call to pthread_create. */ if (!upcri_mypthread()) { const char *empty = ""; /* avoid a gcc warning for printfing an empty string */ fprintf(stdout, empty); /* init puts() internal locks */ fprintf(stderr, empty); gasnett_flush_streams(); /* init fflush() internal locks */ } #endif /* Barrier, so inits complete on all threads before running user code */ UPCRI_SINGLE_BARRIER(); /* init GASP instrumentation tool, which may run some UPC code callbacks */ upcri_pevt_init(&(pargs->argc), &(pargs->argv)); /* Freeze thread and block others in barrier if user wants to attach a * debugger */ { freezestr = gasnet_getenv("UPC_FREEZE"); if (!upcr_mythread()) gasnett_envstr_display("UPC_FREEZE", freezestr, !freezestr); if (freezestr) upcri_startup_freeze(pargs, atoi(freezestr)); } if (!upcri_mypthread()) upcri_memcheck_all(); /* verify heap sanity */ if (pargs->spawnfuncs->main_function) { /* Run user's application */ exitval = pargs->spawnfuncs->main_function(pargs->argc, pargs->argv); upcri_do_exit(exitval); } return NULL; } #if UPCRI_SUPPORT_PTHREADS /* Thread continuously polls the network, to ensure progress. * Not a good idea if you want performance, but was needed for Totalview. * Retained as harmless, for now. */ static int upcri_tv_progress_verbose = 0; /* for debugging TV support */ static pthread_t upcri_tv_progress_thread; static int upcri_tv_progress_exit = 0; void *upcri_run_progress_thread(void *ignoreme) { while (! upcri_tv_progress_exit) { if (upcri_tv_progress_verbose) { static int verbose_slowdown = 0; static int monotonic = 1; if (!verbose_slowdown) { printf("Progress thread in process %d running %d...\n", (int)upcri_mynode, monotonic++); fflush(stdout); } if (verbose_slowdown++ == 100) verbose_slowdown = 0; } /* We need a yield, not a sleep() here, as sleep will limit the number * of AM requests we can handle to 1/sec if the rest of the process is * stopped by Totalview */ gasnett_sched_yield(); upcr_poll_nofence(); } return NULL; } pthread_t upcri_create_thread(void *(*start_routine)(void *), void *arg) { pthread_t t; pthread_attr_t attr; int retval; if ((errno=pthread_attr_init(&attr))) upcri_errno("pthread_attr_init"); if ((retval = pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM))) { UPCRI_TRACE_PRINTF(("pthread_attr_setscope(PTHREAD_SCOPE_SYSTEM) => %s(%i)", strerror(retval), retval)); #if !PLATFORM_OS_CYGWIN && !PLATFORM_OS_FREEBSD && !PLATFORM_OS_OPENBSD && !PLATFORM_OS_IRIX /* ignore known failures on some OS's */ errno = retval; upcri_errno("pthread_attr_setscope"); #endif } if (upcri_stacksz) { if ((errno=pthread_attr_setstacksize(&attr, upcri_stacksz))) upcri_errno("pthread_attr_setstacksize"); } /* Cygwin 2.x has been seen to crash in pthread_attr_{set,get}guardsize(). * TODO: a stronger configure probe would be a better alternative. */ #if PLATFORM_OS_CYGWIN #undef HAVE_PTHREAD_ATTR_SETGUARDSIZE #endif #ifdef HAVE_PTHREAD_ATTR_SETGUARDSIZE /* set a guard page, if supported (ignore errors) */ pthread_attr_setguardsize(&attr, UPCR_PAGESIZE); #endif if ((errno=pthread_create(&t, &attr, start_routine, arg))) upcri_errno("pthread_create"); if ((errno=pthread_attr_destroy(&attr))) upcri_errno("pthread_attr_destroy"); return t; } #endif void upcr_startup_spawn(int *pargc, char ***pargv, uintptr_t static_data_size, uintptr_t default_cache_size, struct upcr_startup_spawnfuncs *spawnfuncs) { struct upcri_spawn_arg mainarg; if (upcri_startup_lvl >= upcri_startup_done) return; if (upcri_startup_lvl != upcri_startup_spawn) upcri_err("upcr_startup_spawn called while upcri_startup_lvl=%d", upcri_startup_lvl); upcri_startup_lvl++; /* round up static and cache sizes to page size */ static_data_size = upcri_roundup_pagesz(static_data_size); default_cache_size = upcri_roundup_pagesz(default_cache_size); /* Hook for arbitrary per-process initialization */ if (spawnfuncs->pre_spawn_init != NULL) spawnfuncs->pre_spawn_init(); #ifdef UPCRI_UPC_PTHREADS if (!spawnfuncs->main_function) upcri_err("upcr_startup_spawn cannot be called with a NULL " "main_function under pthreads"); #endif if (UPCRL_mpi_init != NULL) UPCRL_mpi_init(pargc, pargv); #if UPCRI_SUPPORT_PTHREADS if (UPCRL_progress_thread) { upcri_tv_progress_verbose = gasnett_getenv_yesno_withdefault("UPC_TV_PROGRESS_VERBOSE", 0); upcri_tv_progress_thread = upcri_create_thread(&upcri_run_progress_thread, NULL); } #endif #ifdef UPCRI_UPC_PTHREADS #ifdef PTHREAD_STACK_MIN #define UPCRI_PTHREAD_STACK_MIN PTHREAD_STACK_MIN #else #define UPCRI_PTHREAD_STACK_MIN UPCR_PAGESIZE #endif gasnett_threadkey_init(upcri_pthread_key); { /* find a suitable stack size for pthreads we're creating, with optional user intervention */ pthread_attr_t attr; size_t stack_pad, stack_dflt; if ((errno=pthread_attr_init(&attr))) upcri_errno("pthread_attr_init"); upcri_stacksz = gasnett_getenv_int_withdefault("UPC_STACK_SIZE", UPCRI_STACK_DEFAULT, 1); upcri_stacksz = MAX(upcri_stacksz, UPCRI_PTHREAD_STACK_MIN); if ((errno=pthread_attr_getstacksize(&attr, &stack_dflt))) upcri_errno("pthread_attr_getstacksize"); stack_pad = gasnett_getenv_int_withdefault("UPC_STACK_PAD", 0, 1); upcri_stacksz = MAX(upcri_stacksz, (stack_dflt + stack_pad)); upcri_stacksz = UPCRI_PAGEALIGNUP(upcri_stacksz); while (pthread_attr_setstacksize(&attr, upcri_stacksz) == EINVAL) { /* TODO: binary search? */ if (upcri_stacksz <= UPCRI_PTHREAD_STACK_MIN) { UPCRI_TRACE_PRINTF(("pthread stack size search failed")); upcri_stacksz = 0; break; } else upcri_stacksz /= 2; } if (upcri_stacksz) UPCRI_TRACE_PRINTF(("pthread stack size set to %lu", (unsigned long)upcri_stacksz)); #if UPCR_DEBUG if (upcri_stacksz) upcri_stacksz_threshhold = 0.95*upcri_stacksz; #endif if ((errno=pthread_attr_destroy(&attr))) upcri_errno("pthread_attr_destroy"); } { /* launch pthreads */ int p; for (p = 1; p < upcri_mypthreads(); p++) { struct upcri_spawn_arg *arg; int i; /* make copy of command line args for each pthread */ arg = UPCRI_XMALLOC(struct upcri_spawn_arg, 1); arg->argc = *pargc; arg->argv = UPCRI_XMALLOC(char*, *pargc); for (i = 0; i < *pargc; i++) arg->argv[i] = strdup((*pargv)[i]); arg->mypthread = p; arg->mythread = upcri_1stthread(upcri_mynode) + p; arg->static_data_size = static_data_size; arg->default_cache_size = default_cache_size; arg->spawnfuncs = UPCRI_XMALLOC(struct upcr_startup_spawnfuncs, 1); *arg->spawnfuncs = *spawnfuncs; upcri_create_thread(&upcri_perthread_spawn, arg); } } #ifdef HAVE_PTHREAD_SETCONCURRENCY { /* give pthread library a hint about how many CPU's we want */ int th = upcri_mypthreads(); int retval = 0; /* some pthread implementations (eg FreeBSD) return EAGAIN if hint is too large */ while (th > 1 && (retval = pthread_setconcurrency(th)) == EAGAIN) { UPCRI_TRACE_PRINTF(("pthread_setconcurrency(%i) => %s(%i)", th, strerror(retval), retval)); th--; } if (retval && retval != EAGAIN) { errno = retval; upcri_errno("pthread_setconcurrency"); } } #endif /* first pthread on each node gets original thread */ mainarg.mypthread = 0; #endif mainarg.mythread = upcri_1stthread(upcri_mynode); mainarg.argc = *pargc; mainarg.argv = *pargv; mainarg.static_data_size = static_data_size; mainarg.default_cache_size = default_cache_size; mainarg.spawnfuncs = spawnfuncs; upcri_perthread_spawn(&mainarg); } /* (optionally timed) final barrier - returns non-zero on timeout */ static int upcri_finalbarrier(uint64_t time_limit) { UPCR_BEGIN_FUNCTION(); #if UPCR_DEBUG if ( 0 == upcri_auxdata()->finalbarrier_count++ ) { /* send a final barrier confirmation to zero containing the current phase */ int tmp = upcri_auxdata()->barrier_info.barrier_phase; upcr_put_pshared_strict(upcri_exitconf, sizeof(int)*upcr_mythread(), &tmp, sizeof(int)); } #endif upcr_notify(0, UPCR_BARRIERFLAG_ANONYMOUS); if (time_limit) { gasnett_tick_t start = gasnett_ticks_now(); while (!upcr_try_wait(0, UPCR_BARRIERFLAG_ANONYMOUS)) { if (gasnett_ticks_to_us(gasnett_ticks_now() - start) > time_limit) return 1; gasnet_AMPoll(); gasnett_sched_yield(); } } else { upcr_wait(0, UPCR_BARRIERFLAG_ANONYMOUS); } return 0; } #if UPCR_DEBUG static void upcri_check_finalbarrier(void) { UPCR_BEGIN_FUNCTION(); int i; int threads = upcr_threads(); int mythread = upcr_mythread(); int minthread = mythread; int myphase = 1 & (upcri_auxdata()->barrier_info.barrier_phase ^ upcri_auxdata()->finalbarrier_count); int *tmp = upcri_checkmalloc(sizeof(int)*threads); char *desc = upcri_checkmalloc(threads*10); char *p = desc; upcr_get_pshared_strict(tmp, upcri_exitconf, 0, sizeof(int)*threads); for (i = 0; i < threads; i++) { if (tmp[i] == -1) { /* no arrival: threads are still running in user code */ snprintf(p, 10, "%i ", i); p += strlen(p); } else if (tmp[i] != myphase) { /* late exit: threads reached final barrier in a different * phase than us, one barrier behind us */ snprintf(p, 10, "%i ", i); p += strlen(p); } else if (i < minthread) { minthread = i; } } if ((p != desc) && (minthread == mythread)) upcri_err("Early exit detected: the following threads did not " "reach the final implicit barrier: %s", desc); upcri_free(tmp); upcri_free(desc); } #endif /* * Polite exit, with final barrier. */ void upcr_exit(int exitcode) { UPCR_BEGIN_FUNCTION(); upcri_do_exit(exitcode); } /* * Terminate program on all threads and nodes */ void upcr_global_exit(int exitcode) { UPCRI_PTHREADINFO_LOOKUPDECL_IFINST(); UPCRI_TRACE_PRINTF(("upc_global_exit(%i)", exitcode)); if (!upcri_collective_exit) { #define UPCRI_PEVT_ARGS , exitcode upcri_pevt_atomic(GASP_UPC_NONCOLLECTIVE_EXIT); #undef UPCRI_PEVT_ARGS gasnett_flush_streams(); } /* else: caller did upcri_pevt_atomic(GASP_UPC_COLLECTIVE_EXIT) and flush */ #if UPCRI_SUPPORT_PTHREADS if (UPCRL_progress_thread) { upcri_tv_progress_exit = 1; pthread_join(upcri_tv_progress_thread, NULL); } #endif if (!upcri_gasnet_exit_called) upcri_atexitcode = exitcode; if (UPCRL_profile_finalize != NULL) { #ifdef UPCRI_SUPPORT_PTHREADS static gasnett_mutex_t tmp_lock = GASNETT_MUTEX_INITIALIZER; gasnett_mutex_lock(&tmp_lock); if (UPCRL_profile_finalize != NULL) { #endif if (upcri_nodes > 1 && !upcr_getenv("UPC_QUIET")) upcri_warn("Node %i called upc_global_exit while profiling. " "For most reliable profiling results, all threads should call exit() or return from main().", (int)upcri_mynode); UPCRL_profile_finalize(); UPCRL_profile_finalize = NULL; gasnett_flush_streams(); #ifdef UPCRI_SUPPORT_PTHREADS } gasnett_mutex_unlock(&tmp_lock); #endif } upcri_gasnet_exit_called = 1; gasnet_exit(exitcode); } /* * Do final barrier before exiting in normal circumstances */ void upcri_do_exit(int exitval) { UPCR_BEGIN_FUNCTION(); /* Set source line info so the final barrier doesn't get logged with * the same file/line as the last logged operation in user code. */ UPCR_SET_SRCPOS("_FINAL_BARRIER",0); #define UPCRI_PEVT_ARGS , exitval upcri_pevt_start(GASP_UPC_COLLECTIVE_EXIT); /* must flush before reaching final barrier to prevent a race * where another thread kills us before we reach the flush in * gasnet_exit() (bug 410) */ gasnett_flush_streams(); /* Perform final barrier (to guarantee that processes don't go away before * other processes are finished using their shared memory). * we know the job is about to exit, so here we use an anonymous barrier * that will match any user barrier in the case of an early-exit bug, * and use the detection logic below to issue a nice error message if * that's the case (in a debug build only) * XXX: When GASNet interfaces are ready, a team reduction w/ timeout might * be more scalable and would not interact w/ the UPC barrier. */ (void) upcri_finalbarrier(0); upcri_pevt_end(GASP_UPC_COLLECTIVE_EXIT); #undef UPCRI_PEVT_ARGS #if UPCRI_GASP upcri_pthread_barrier(); /* disable further instrumentation callbacks to prevent confusing the tool */ upcri_pevt_isinit = 0; if (upcri_nodes > 1) { /* wait for all nodes to finish finalizing GASP */ (void) upcri_finalbarrier(0); } #endif /* Call user exit functions, if any (once per process) */ if (upcri_mypthread() == 0 && UPCRL_mpi_finalize != NULL) { UPCRL_mpi_finalize(); } if (UPCRL_profile_finalize != NULL) { upcri_pthread_barrier(); if (upcri_mypthread() == 0) { UPCRL_profile_finalize(); UPCRL_profile_finalize = NULL; } upcri_pthread_barrier(); gasnett_flush_streams(); upcri_pthread_barrier(); if (upcri_nodes > 1) { /* wait for all nodes to write profile */ (void) upcri_finalbarrier(0); } } #if UPCR_DEBUG /* extra barrier safety checking to detect erroneous programs with * partial thread early-exit bugs. initially check only on thread * zero, which has affinity to the necessary data. */ if (upcr_mythread() == 0) upcri_check_finalbarrier(); /* other theads must wait here for thread 0 to complete the final barrier check. * we choose a reasonably large timeout value to prevent false negatives at large scale. * for simplicity thread 0 follows this same path */ if (upcri_finalbarrier((5 + upcri_nodes / 64) * 1E6)) { /* hmm.. timed barrier failed - probably means thread zero is not here, * ie this is an erroneous program and we're an early exit thread */ upcri_check_finalbarrier(); /* if we returned from above then the exit is collective, but we may * have raced against the arbitrary value of waittime, so fall through * to the collective ucpr_global_exit() call. */ } #endif upcri_collective_exit = 1; if (!upcri_use_gasnet_exit) exit(exitval); upcr_global_exit(exitval); } static void do_bupc_init_reentrant(int *pargc, char ***pargv, int (*user_func)(int, char**)) { struct upcr_startup_spawnfuncs spawnfuncs; uintptr_t static_data_size = 0; static int alreadycalled = 0; if (alreadycalled++) return; #if UPCRI_HAVE_LINKER_SECTION static_data_size = upcri_roundup_pagesz(UPCRL_shared_end - UPCRL_shared_begin); #endif upcr_startup_init(pargc, pargv, UPCRL_static_thread_count, UPCRL_default_pthreads_per_node, UPCRL_main_name); upcr_startup_attach(UPCRL_default_shared_size, UPCRL_default_shared_offset, UPCRL_attach_flags); spawnfuncs.pre_spawn_init = UPCRL_pre_spawn_init; spawnfuncs.per_pthread_init = UPCRL_per_pthread_init; spawnfuncs.heap_init = UPCRL_heap_init; spawnfuncs.static_init = UPCRL_static_init; spawnfuncs.cache_init = UPCRL_cache_init; spawnfuncs.main_function = user_func; upcr_startup_spawn(pargc, pargv, static_data_size, UPCRL_default_cache_size, &spawnfuncs); } void bupc_init(int *pargc, char ***pargv) { #if UPCRI_UPC_PTHREADS upcri_early_err("Cannot use bupc_init with a pthreaded application"); #endif if (!UPCRL_heap_init || !UPCRL_static_init) upcri_early_err("bupc_init not supported in this version of the UPC runtime"); if (!pargc || *pargc == 0) upcri_early_err("bupc_init: invalid value passed for pargc"); if (!pargv || !(*pargv) || !(**pargv) || ***pargv == '\0') upcri_early_err("bupc_init: invalid value passed for pargv"); do_bupc_init_reentrant(pargc, pargv, NULL); } void bupc_init_reentrant(int *pargc, char ***pargv, int (*user_func)(int, char**)) { if (!UPCRL_heap_init || !UPCRL_static_init) upcri_early_err("bupc_init not supported in this version of the UPC runtime"); if (!pargc || *pargc == 0) upcri_early_err("bupc_init_reentrant: invalid value passed for pargc"); if (!pargv || !(*pargv) || !(**pargv) || ***pargv == '\0') upcri_early_err("bupc_init_reentrant: invalid value passed for pargv"); if (user_func == NULL) upcri_early_err("bupc_init_reentrant cannot be called with a NULL " "user_func"); do_bupc_init_reentrant(pargc, pargv, user_func); } char * bupc_getenv(const char *env_name) { return gasnet_getenv(env_name); } void bupc_exit(int exitcode) { upcr_exit(exitcode); } /* * bzeroes this threads's portion of a shared pointer allocation. */ #define upcri_bzero_mythreads_shareof(sptr, info) \ _upcri_bzero_mythreads_shareof(sptr, info UPCRI_PT_PASS) static void _upcri_bzero_mythreads_shareof(upcr_shared_ptr_t sptr, const upcr_startup_shalloc_t *info UPCRI_PT_ARG) { size_t blockbytes = info->blockbytes; size_t blocks = info->numblocks; size_t myblocks; upcri_assert(blockbytes != 0); upcri_assert(blocks != 0); if (info->mult_by_threads) { myblocks = blocks; } else { myblocks = (blocks + upcr_threads() - 1 - upcr_mythread()) / upcr_threads(); } /* memset all of our blocks to 0 */ bzero(upcri_shared_to_remote(upcr_add_shared(sptr, blockbytes, upcr_mythread(), 1)), blockbytes * myblocks); } /* * Same, for pshared ptrs. */ #define upcri_bzero_mythreads_pshareof(sptr, info) \ _upcri_bzero_mythreads_pshareof(sptr, info UPCRI_PT_PASS) void _upcri_bzero_mythreads_pshareof(upcr_pshared_ptr_t sptr, const upcr_startup_pshalloc_t *info UPCRI_PT_ARG) { size_t blockbytes = info->blockbytes; size_t blocks = info->numblocks; size_t myblocks; upcri_assert(blockbytes != 0); upcri_assert(blocks != 0); if (info->mult_by_threads) { myblocks = blocks; } else { myblocks = (blocks + upcr_threads() - 1 - upcr_mythread()) / upcr_threads(); } /* memset all of our blocks to 0 */ bzero(upcri_pshared_to_remote(upcr_add_pshared1(sptr, blockbytes, upcr_mythread())), blockbytes * myblocks); } /* * Allocates the specified amount of memory for each shared pointer in the * array of info structs. */ void _upcr_startup_shalloc(upcr_startup_shalloc_t *infos, size_t count UPCRI_PT_ARG) { upcr_shared_ptr_t *sptrbuf; int i; sptrbuf = UPCRI_XMALLOC(upcr_shared_ptr_t, count); /* STEP 1: UPC thread 0 does all the allocations */ if (upcr_mythread() == 0) { for (i = 0; i < count; i++) { upcr_shared_ptr_t *pptr = infos[i].sptr_addr; size_t blockbytes = infos[i].blockbytes; size_t numblocks = infos[i].numblocks; /* Skip if ptr has already had memory assigned to it via a * previous call to this function (possible if tentatively * declared) */ if (!upcr_is_init_shared(*pptr) && !upcr_isnull_shared(*pptr)) continue; for (int j = 0; j < i; j++) { // bug 3776: detect and ignore duplicates in infos // Current algorithm incurs initialization-time overhead quadratic in the // number of static shared variables in the input (ie declared within one // translation unit). If this ever becomes a bottleneck, the prior linear // behavior could be restored by introducing four new "dummy" PTS values // to store in the proxy pointers while the algorithm runs (to serve as // an in-progress flag for {init,null} x {shared,pshared}), but this // entails significant additional complexity that does not seem justified // at present. if (pptr == infos[j].sptr_addr) { upcr_setnull_shared(&sptrbuf[i]); goto next; } } if (infos[i].mult_by_threads) numblocks *= upcr_threads(); /* upcr_global_alloc will convert to local allocation * if numblocks == 1 */ sptrbuf[i] = upcr_global_alloc(numblocks, blockbytes); next:; } } /* Broadcast info about the allocations. */ upcri_broadcast(0, sptrbuf, sizeof(upcr_shared_ptr_t) * count); /* STEP 2: Every UPC thread bzeros as needed */ for (i = 0; i < count; i++) { upcr_shared_ptr_t *pptr = infos[i].sptr_addr; /* Must bzero our node's portion of the alloc'd memory if the * user's data was uninitialized. Due to tentative definition * semantics, the UPC compiler can't know this at compile time, * but the value supplied by the linker tells us, so we check it * here. */ /* Note that calling upcr_isnull_shared() with UPCR_INITIALIZED_SHARED is an error */ if (!upcr_is_init_shared(*pptr) && upcr_isnull_shared(*pptr) && !upcr_isnull_shared(sptrbuf[i])) { upcri_bzero_mythreads_shareof(sptrbuf[i], &infos[i]); } } /* Thread barrier to allow bzeros to complete before overwritting infos[i].sptr_addr */ upcri_pthread_barrier(); /* STEP 3: 0th pthread on each node copies the allocated pointers to the node's sptrs */ if (upcri_mypthread() == 0) { for (i = 0; i < count; i++) { upcr_shared_ptr_t *pptr = infos[i].sptr_addr; if (upcr_is_init_shared(*pptr) || upcr_isnull_shared(*pptr)) { upcri_assert(!upcr_isnull_shared(sptrbuf[i])); *pptr = sptrbuf[i]; upcri_pevt_shalloc(infos+i); } /* else tentatively declared variable already allocated in previous call: skip */ } } upcri_free(sptrbuf); } void _upcr_startup_pshalloc(upcr_startup_pshalloc_t *infos, size_t count UPCRI_PT_ARG) { upcr_pshared_ptr_t *sptrbuf; int i; sptrbuf = UPCRI_XMALLOC(upcr_pshared_ptr_t, count); /* STEP 1: UPC thread 0 does all the allocations */ if (upcr_mythread() == 0) { for (i = 0; i < count; i++) { upcr_pshared_ptr_t *pptr = infos[i].psptr_addr; size_t blockbytes = infos[i].blockbytes; size_t numblocks = infos[i].numblocks; /* Skip if ptr has already had memory assigned to it via a * previous call to this function (possible if tentatively * declared) */ if (!upcr_is_init_pshared(*pptr) && !upcr_isnull_pshared(*pptr)) continue; for (int j = 0; j < i; j++) { // bug 3776: detect and ignore duplicates in infos if (pptr == infos[j].psptr_addr) { upcr_setnull_pshared(&sptrbuf[i]); goto next; } } if (infos[i].mult_by_threads) numblocks *= upcr_threads(); /* global_alloc will convert to local_alloc if numblocks == 1 */ sptrbuf[i] = upcr_shared_to_pshared( upcr_global_alloc(numblocks, blockbytes)); next:; } } /* Broadcast info about the allocations. * * TODO: it would be more efficient to only send the info to the 0th * pthread on each node, instead of all UPC threads. But the additional * memcpy overhead should be minimal, and this function isn't in any * critical path. */ upcri_broadcast(0, sptrbuf, sizeof(upcr_pshared_ptr_t) * count); /* STEP 2: Every UPC thread bzeros as needed */ for (i = 0; i < count; i++) { upcr_pshared_ptr_t *pptr = infos[i].psptr_addr; /* Must bzero our threads's portion of the alloc'd memory if the * user's data was uninitialized. Due to tentative definition * semantics, the UPC compiler can't know this at compile time, * but the value supplied by the linker tells us, so we check it * here. */ /* Note that calling upcr_isnull_pshared() with UPCR_INITIALIZED_PSHARED is an error */ if (!upcr_is_init_pshared(*pptr) && upcr_isnull_pshared(*pptr) && !upcr_isnull_pshared(sptrbuf[i])) { upcri_bzero_mythreads_pshareof(sptrbuf[i], &infos[i]); } } /* Thread barrier to allow bzeros to complete before overwritting infos[i].psptr_addr */ upcri_pthread_barrier(); /* STEP 3: 0th pthread on each node copies the allocated pointers to the node's sptrs */ if (upcri_mypthread() == 0) { for (i = 0; i < count; i++) { upcr_pshared_ptr_t *pptr = infos[i].psptr_addr; if (upcr_is_init_pshared(*pptr) || upcr_isnull_pshared(*pptr)) { upcri_assert(!upcr_isnull_pshared(sptrbuf[i])); *pptr = sptrbuf[i]; upcri_pevt_pshalloc(infos+i); } /* else tentatively declared variable already allocated in previous call: skip */ } } upcri_free(sptrbuf); } void _upcr_startup_initarray(upcr_shared_ptr_t dst, void * src, upcr_startup_arrayinit_diminfo_t *diminfos, size_t dimcnt, size_t elembytes, size_t blockelems UPCRI_PT_ARG) { const size_t local_elems = (src == NULL) ? 0 : diminfos->local_elems; const size_t shared_elems = diminfos->shared_elems * (diminfos->mult_by_threads ? upcr_threads() : 1); const size_t my_first = upcr_mythread() * blockelems; uint8_t *my_src = (uint8_t*)src + (my_first * elembytes); uint8_t *my_dst = upcri_shared_to_remote_withthread(dst,upcr_mythread()); if (dimcnt != 1) { upcri_err("static initialization of multi-dimensional shared arrays is not supported"); } if (!blockelems || (shared_elems <= blockelems) || (upcr_threads() == 1)) { /* indefinate or effectively so */ if (upcr_mythread()) return; else blockelems = shared_elems; } if (my_first < shared_elems) { const size_t rowbytes = elembytes * blockelems * upcr_threads(); size_t to_copy = upcr_affinitysize(local_elems, blockelems, upcr_mythread()); const ssize_t to_zero = upcr_affinitysize(shared_elems, blockelems, upcr_mythread()) - to_copy; while (to_copy) { const size_t xfer_elems = MIN(blockelems, to_copy); const size_t xfer_len = elembytes * xfer_elems; memcpy(my_dst, my_src, xfer_len); my_src += rowbytes; my_dst += xfer_len; to_copy -= xfer_elems; } if (to_zero > 0) { bzero(my_dst, to_zero * elembytes); } } } void _upcr_startup_initparray(upcr_pshared_ptr_t dst, void * src, upcr_startup_arrayinit_diminfo_t *diminfos, size_t dimcnt, size_t elembytes, size_t blockelems UPCRI_PT_ARG) { upcri_assert((blockelems == 0) || (blockelems == 1)); upcr_startup_initarray(upcr_pshared_to_shared(dst), src, diminfos, dimcnt, elembytes, blockelems); } /* * Strings we use to verify all objects/libs compiled with same options */ GASNETT_IDENT(upcri_IdentString_GASNetConfig, "$GASNetConfig: (libupcr.a) " GASNET_CONFIG_STRING " $"); GASNETT_IDENT(upcri_IdentString_UPCRConfig, "$UPCRConfig: (libupcr.a) " UPCR_CONFIG_STRING " $"); /* Build timestamp */ GASNETT_IDENT(upcri_IdentString_BuildTimestamp, "$UPCRBuildTimestamp: built on " __DATE__ " at " __TIME__ " $"); GASNETT_IDENT(upcri_IdentString_PageSize, "$UPCRPageSize: " _STRINGIFY(UPCR_PAGESIZE) " $");