NCCL
NCCL is NVIDIA's open source GPU communication library that supports aggregate and peer-to-peer communication.
Take a look at one of the official demos given:
#include <> #include "cuda_runtime.h" #include "" #include "" #include <> #include <> #define MPICHECK(cmd) do { \ int e = cmd; \ if( e != MPI_SUCCESS ) { \ printf("Failed: MPI error %s:%d '%d'\n", \ __FILE__,__LINE__, e); \ exit(EXIT_FAILURE); \ } \ } while(0) #define CUDACHECK(cmd) do { \ cudaError_t e = cmd; \ if( e != cudaSuccess ) { \ printf("Failed: Cuda error %s:%d '%s'\n", \ __FILE__,__LINE__,cudaGetErrorString(e)); \ exit(EXIT_FAILURE); \ } \ } while(0) #define NCCLCHECK(cmd) do { \ ncclResult_t r = cmd; \ if (r!= ncclSuccess) { \ printf("Failed, NCCL error %s:%d '%s'\n", \ __FILE__,__LINE__,ncclGetErrorString(r)); \ exit(EXIT_FAILURE); \ } \ } while(0) static uint64_t getHostHash(const char* string) { // Based on DJB2a, result = result * 33 ^ char uint64_t result = 5381; for (int c = 0; string[c] != '\0'; c++){ result = ((result << 5) + result) ^ string[c]; } return result; } static void getHostName(char* hostname, int maxlen) { gethostname(hostname, maxlen); for (int i=0; i< maxlen; i++) { if (hostname[i] == '.') { hostname[i] = '\0'; return; } } } int main(int argc, char* argv[]) { int size = 32*1024*1024; int myRank, nRanks, localRank = 0; //initializing MPI MPICHECK(MPI_Init(&argc, &argv)); MPICHECK(MPI_Comm_rank(MPI_COMM_WORLD, &myRank)); MPICHECK(MPI_Comm_size(MPI_COMM_WORLD, &nRanks)); //calculating localRank which is used in selecting a GPU uint64_t hostHashs[nRanks]; char hostname[1024]; getHostName(hostname, 1024); hostHashs[myRank] = getHostHash(hostname); MPICHECK(MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, hostHashs, sizeof(uint64_t), MPI_BYTE, MPI_COMM_WORLD)); for (int p=0; p<nRanks; p++) { if (p == myRank) break; if (hostHashs[p] == hostHashs[myRank]) localRank++; } //each process is using two GPUs int nDev = 2; float** sendbuff = (float**)malloc(nDev * sizeof(float*)); float** recvbuff = (float**)malloc(nDev * sizeof(float*)); cudaStream_t* s = (cudaStream_t*)malloc(sizeof(cudaStream_t)*nDev); //picking GPUs based on localRank for (int i = 0; i < nDev; ++i) { CUDACHECK(cudaSetDevice(localRank*nDev + i)); CUDACHECK(cudaMalloc(sendbuff + i, size * sizeof(float))); CUDACHECK(cudaMalloc(recvbuff + i, size * sizeof(float))); CUDACHECK(cudaMemset(sendbuff[i], 1, size * sizeof(float))); CUDACHECK(cudaMemset(recvbuff[i], 0, size * sizeof(float))); CUDACHECK(cudaStreamCreate(s+i)); } ncclUniqueId id; ncclComm_t comms[nDev]; //generating NCCL unique ID at one process and broadcasting it to all if (myRank == 0) ncclGetUniqueId(&id); MPICHECK(MPI_Bcast((void *)&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD)); //initializing NCCL, group API is required around ncclCommInitRank as it is //called across multiple GPUs in each thread/process NCCLCHECK(ncclGroupStart()); for (int i=0; i<nDev; i++) { CUDACHECK(cudaSetDevice(localRank*nDev + i)); NCCLCHECK(ncclCommInitRank(comms+i, nRanks*nDev, id, myRank*nDev + i)); } NCCLCHECK(ncclGroupEnd()); //calling NCCL communication API. Group API is required when using //multiple devices per thread/process NCCLCHECK(ncclGroupStart()); for (int i=0; i<nDev; i++) NCCLCHECK(ncclAllReduce((const void*)sendbuff[i], (void*)recvbuff[i], size, ncclFloat, ncclSum, comms[i], s[i])); NCCLCHECK(ncclGroupEnd()); //synchronizing on CUDA stream to complete NCCL communication for (int i=0; i<nDev; i++) CUDACHECK(cudaStreamSynchronize(s[i])); //freeing device memory for (int i=0; i<nDev; i++) { CUDACHECK(cudaFree(sendbuff[i])); CUDACHECK(cudaFree(recvbuff[i])); } //finalizing NCCL for (int i=0; i<nDev; i++) { ncclCommDestroy(comms[i]); } //finalizing MPI MPICHECK(MPI_Finalize()); printf("[MPI Rank %d] Success \n", myRank); return 0; }
In the example above, rank0 will execute ncclGetUniqueId to get the Id and then broadcast it to other ranks via mpi, next see how the UniqueId is generated.
ncclResult_t ncclGetUniqueId(ncclUniqueId* out) { NCCLCHECK(ncclInit()); NCCLCHECK(PtrCheck(out, "GetUniqueId", "out")); return bootstrapGetUniqueId(out); }
Then look at ncclInit
First execute initEnv to set the environment variable
Then execute initNet, used to initialize the network needed by nccl, including two, one is bootstrap network, the other is the data communication network, bootstrap network is mainly used for initialization when exchanging some simple information, such as the ip port of each machine, due to the small amount of data, and mainly in the initialization phase is executed once, so the The bootstrap network uses tcp, while the communication network is used for actual data transmission, so it will prioritize the use of rdma (or gdr if it supports gdr).
ncclResult_t initNet() { // Always initialize bootstrap network NCCLCHECK(bootstrapNetInit()); NCCLCHECK(initNetPlugin(&ncclNet, &ncclCollNet)); if (ncclNet != NULL) return ncclSuccess; if (initNet(&ncclNetIb) == ncclSuccess) { ncclNet = &ncclNetIb; } else { NCCLCHECK(initNet(&ncclNetSocket)); ncclNet = &ncclNetSocket; } return ncclSuccess; }
bootstrapNetInit is the initialization of bootstrap network, mainly through findInterfaces traverses all the NIC information on the machine, through the prefixList match to select which NIC to use, the information of the available NICs will be saved, the ifa_name will be saved to global bootstrapNetIfNames, ip addresses are saved to global bootstrapNetIfAddrs, by default, except docker and lo other NICs can be used.
For example, there are three NICs on the test machine, xgbe0, xgbe1, xgbe2, then the three ifaname and the corresponding ip address will be saved, in addition, nccl provides the environment variable NCCL_SOCKET_IFNAME can be used to specify that you want to use the name of the NIC, for example, by export NCCL_SOCKET_IFNAME=xgbe0 to specify the use of xgbe0, in fact, through the prefixList to match to do so. IFNAME=xgbe0 to specify the use of xgbe0, in fact, through the prefixList to match to do so.
static int findInterfaces(const char* prefixList, char* names, union socketAddress *addrs, int sock_family, int maxIfNameSize, int maxIfs) { struct netIf userIfs[MAX_IFS]; bool searchNot = prefixList && prefixList[0] == '^'; if (searchNot) prefixList++; bool searchExact = prefixList && prefixList[0] == '='; if (searchExact) prefixList++; int nUserIfs = parseStringList(prefixList, userIfs, MAX_IFS); int found = 0; struct ifaddrs *interfaces, *interface; getifaddrs(&interfaces); for (interface = interfaces; interface && found < maxIfs; interface = interface->ifa_next) { if (interface->ifa_addr == NULL) continue; int family = interface->ifa_addr->sa_family; if (family != AF_INET && family != AF_INET6) continue; if (sock_family != -1 && family != sock_family) continue; if (family == AF_INET6) { struct sockaddr_in6* sa = (struct sockaddr_in6*)(interface->ifa_addr); if (IN6_IS_ADDR_LOOPBACK(&sa->sin6_addr)) continue; } if (!(matchIfList(interface->ifa_name, -1, userIfs, nUserIfs, searchExact) ^ searchNot)) { continue; } bool duplicate = false; for (int i = 0; i < found; i++) { if (strcmp(interface->ifa_name, names+i*maxIfNameSize) == 0) { duplicate = true; break; } } if (!duplicate) { strncpy(names+found*maxIfNameSize, interface->ifa_name, maxIfNameSize); int salen = (family == AF_INET) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6); memcpy(addrs+found, interface->ifa_addr, salen); found++; } } freeifaddrs(interfaces); return found; }
Start initializing the communication network
The ncclNet_t structure is a series of function pointers, such as initialization, send, receive, etc.; socket, IB, and other communication modes all implement their own ncclNet_t, such as ncclNetSocket, ncclNetIb, etc. The process of initializing a communication network is to see which communication mode is available in turn, and then assign the value to the global ncclNet .
First execute initNetPlugin to see if it's there, the test environment doesn't have this so, so it returns directly.
Then try to use the IB network:
The first execution of the init function of ncclNetIb is ncclIbInit
ncclResult_t ncclIbInit(ncclDebugLogger_t logFunction) { static int shownIbHcaEnv = 0; if(wrap_ibv_symbols() != ncclSuccess) { return ncclInternalError; } if (ncclParamIbDisable()) return ncclInternalError; if (ncclNIbDevs == -1) { pthread_mutex_lock(&ncclIbLock); wrap_ibv_fork_init(); if (ncclNIbDevs == -1) { ncclNIbDevs = 0; if (findInterfaces(ncclIbIfName, &ncclIbIfAddr, MAX_IF_NAME_SIZE, 1) != 1) { WARN("NET/IB : No IP interface found."); return ncclInternalError; } // Detect IB cards int nIbDevs; struct ibv_device** devices; // Check if user defined which IB device:port to use char* userIbEnv = getenv("NCCL_IB_HCA"); if (userIbEnv != NULL && shownIbHcaEnv++ == 0) INFO(NCCL_NET|NCCL_ENV, "NCCL_IB_HCA set to %s", userIbEnv); struct netIf userIfs[MAX_IB_DEVS]; bool searchNot = userIbEnv && userIbEnv[0] == '^'; if (searchNot) userIbEnv++; bool searchExact = userIbEnv && userIbEnv[0] == '='; if (searchExact) userIbEnv++; int nUserIfs = parseStringList(userIbEnv, userIfs, MAX_IB_DEVS); if (ncclSuccess != wrap_ibv_get_device_list(&devices, &nIbDevs)) return ncclInternalError; for (int d=0; d<nIbDevs && ncclNIbDevs<MAX_IB_DEVS; d++) { struct ibv_context * context; if (ncclSuccess != wrap_ibv_open_device(&context, devices[d]) || context == NULL) { WARN("NET/IB : Unable to open device %s", devices[d]->name); continue; } int nPorts = 0; struct ibv_device_attr devAttr; memset(&devAttr, 0, sizeof(devAttr)); if (ncclSuccess != wrap_ibv_query_device(context, &devAttr)) { WARN("NET/IB : Unable to query device %s", devices[d]->name); if (ncclSuccess != wrap_ibv_close_device(context)) { return ncclInternalError; } continue; } for (int port = 1; port <= devAttr.phys_port_cnt; port++) { struct ibv_port_attr portAttr; if (ncclSuccess != wrap_ibv_query_port(context, port, &portAttr)) { WARN("NET/IB : Unable to query port %d", port); continue; } if ( != IBV_PORT_ACTIVE) continue; if (portAttr.link_layer != IBV_LINK_LAYER_INFINIBAND && portAttr.link_layer != IBV_LINK_LAYER_ETHERNET) continue; // check against user specified HCAs/ports if (! (matchIfList(devices[d]->name, port, userIfs, nUserIfs, searchExact) ^ searchNot)) { continue; } TRACE(NCCL_INIT|NCCL_NET,"NET/IB: [%d] %s:%d/%s ", d, devices[d]->name, port, portAttr.link_layer == IBV_LINK_LAYER_INFINIBAND ? "IB" : "RoCE"); ncclIbDevs[ncclNIbDevs].device = d; ncclIbDevs[ncclNIbDevs].guid = devAttr.sys_image_guid; ncclIbDevs[ncclNIbDevs].port = port; ncclIbDevs[ncclNIbDevs].link = portAttr.link_layer; ncclIbDevs[ncclNIbDevs].speed = ncclIbSpeed(portAttr.active_speed) * ncclIbWidth(portAttr.active_width); ncclIbDevs[ncclNIbDevs].context = context; strncpy(ncclIbDevs[ncclNIbDevs].devName, devices[d]->name, MAXNAMESIZE); NCCLCHECK(ncclIbGetPciPath(ncclIbDevs[ncclNIbDevs].devName, &ncclIbDevs[ncclNIbDevs].pciPath, &ncclIbDevs[ncclNIbDevs].realPort)); ncclIbDevs[ncclNIbDevs].maxQp = devAttr.max_qp; ncclNIbDevs++; nPorts++; pthread_create(&ncclIbAsyncThread, NULL, ncclIbAsyncThreadMain, context); } if (nPorts == 0 && ncclSuccess != wrap_ibv_close_device(context)) { return ncclInternalError; } } if (nIbDevs && (ncclSuccess != wrap_ibv_free_device_list(devices))) { return ncclInternalError; }; } if (ncclNIbDevs == 0) { INFO(NCCL_INIT|NCCL_NET, "NET/IB : No device found."); } else { char line[1024]; line[0] = '\0'; for (int d=0; d<ncclNIbDevs; d++) { snprintf(line+strlen(line), 1023-strlen(line), " [%d]%s:%d/%s", d, ncclIbDevs[d].devName, ncclIbDevs[d].port, ncclIbDevs[d].link == IBV_LINK_LAYER_INFINIBAND ? "IB" : "RoCE"); } line[1023] = '\0'; char addrline[1024]; INFO(NCCL_INIT|NCCL_NET, "NET/IB : Using%s ; OOB %s:%s", line, ncclIbIfName, socketToString(&, addrline)); } pthread_mutex_unlock(&ncclIbLock); } return ncclSuccess; }
First the third line loads the dynamic library via wrap_ibv_symbols and then gets the individual functions of the dynamic library.
Then avoid fork causing rdma NIC read/write errors by wrap_ibv_fork_init.
As you will see later, ib networks also use sockets for out-of-band network transfers, so here you also get an available NIC saved to ncclIbIfAddr via findInterfaces.
Get all rdma devices into devices via ibv_get_device_list, traverse each device of devices, because each HCA may have multiple physical ports, so traverse each physical port for each device to get the information of each port.
Then save the relevant information into the global ncclIbDevs, such as which port of which device, whether IB or ROCE is used, the pci path of the device, maxqp, name of the device, etc. Note that here there is also a bootstrap network-like NCCL_SOCKET_IFNAME environment variable called NCCL_IB_HCA, which can specify which IB HCA to use.
Here the whole initialization process is completed, summed up in one sentence is that, after getting all the available IB NICs and normal Ethernet cards on the current machine and save them.
Then start generating the UniqueId
ncclResult_t bootstrapCreateRoot(ncclUniqueId* id, bool idFromEnv) { ncclNetHandle_t* netHandle = (ncclNetHandle_t*) id; void* listenComm; NCCLCHECK(bootstrapNetListen(idFromEnv ? dontCareIf : 0, netHandle, &listenComm)); pthread_t thread; pthread_create(&thread, NULL, bootstrapRoot, listenComm); return ncclSuccess; }
The ncclNetHandle_t is also an array of characters and then performs bootstrapNetListen.
static ncclResult_t bootstrapNetListen(int dev, ncclNetHandle_t* netHandle, void** listenComm) { union socketAddress* connectAddr = (union socketAddress*) netHandle; static_assert(sizeof(union socketAddress) < NCCL_NET_HANDLE_MAXSIZE, "union socketAddress size is too large"); // if dev >= 0, listen based on dev if (dev >= 0) { NCCLCHECK(bootstrapNetGetSocketAddr(dev, connectAddr)); } else if (dev == findSubnetIf) { ... } // Otherwise, handle stores a local address struct bootstrapNetComm* comm; NCCLCHECK(bootstrapNetNewComm(&comm)); NCCLCHECK(createListenSocket(&comm->fd, connectAddr)); *listenComm = comm; return ncclSuccess; }
Looking at the three functions in turn, get an available ip address via bootstrapNetGetSocketAddr.
static ncclResult_t bootstrapNetGetSocketAddr(int dev, union socketAddress* addr) { if (dev >= bootstrapNetIfs) return ncclInternalError; memcpy(addr, bootstrapNetIfAddrs+dev, sizeof(*addr)); return ncclSuccess; }
At this time dev is 0, bootstrapNetIfs is the initialization of the bootstrap network to find a total of several available NICs, here is to get the 0th available ip address.
Then bootstrapNetNewComm is created via bootstrapNetNewComm. bootstrapNetComm is actually fd, and bootstrapNetNewComm is actually a new bootstrapNetComm.
struct bootstrapNetComm { int fd; };
Starting a socker server via createListenSocket
static ncclResult_t createListenSocket(int *fd, union socketAddress *localAddr) { /* IPv4/IPv6 support */ int family = localAddr->sa.sa_family; int salen = (family == AF_INET) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6); /* Create socket and bind it to a port */ int sockfd = socket(family, SOCK_STREAM, 0); if (sockfd == -1) { WARN("Net : Socket creation failed : %s", strerror(errno)); return ncclSystemError; } if (socketToPort(&localAddr->sa)) { // Port is forced by env. Make sure we get the port. int opt = 1; #if defined(SO_REUSEPORT) SYSCHECK(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)), "setsockopt"); #else SYSCHECK(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)), "setsockopt"); #endif } // localAddr port should be 0 (Any port) SYSCHECK(bind(sockfd, &localAddr->sa, salen), "bind"); /* Get the assigned Port */ socklen_t size = salen; SYSCHECK(getsockname(sockfd, &localAddr->sa, &size), "getsockname"); #ifdef ENABLE_TRACE char line[1024]; TRACE(NCCL_INIT|NCCL_NET,"Listening on socket %s", socketToString(&localAddr->sa, line)); #endif /* Put the socket in listen mode * NB: The backlog will be silently truncated to the value in /proc/sys/net/core/somaxconn */ SYSCHECK(listen(sockfd, 16384), "listen"); *fd = sockfd; return ncclSuccess; }
Create a listening fd, ip is specified by localaddr, initial port is 0, find a random available port when bind, and write the ip port back to localaddr by getsockname(sockfd, &localAddr->sa, &size), here localaddr is the UniqueId.
By this point the UniqueId is also generated, which is actually the ip and port of the current machine.
Welcome to try the latest version of OneFlow:/Oneflow-Inc…
Above is the NCCL source code analysis of the initialization and ncclUniqueId generation details, more about NCCL initialization ncclUniqueId generation of information please pay attention to my other related articles!