Alpaka: Race condition in CUDA blocking queue

Created on 26 Sep 2019  路  15Comments  路  Source: alpaka-group/alpaka

@krzikalla provided me with a test case where we have dataraces between a synchronous memcopy and a kernel.

The problem is that we create the streams with cudaStreamCreateWithFlags(..., cudaStreamNonBlocking). The cuda documentation that cudaStreamNonBlocking disable the blocking behavior to the default stream 0. In alpaka we use blocking memcopy operations e.g.cudaMemcpy if we use the QueueCudaRtBlocking. The result is that our memcopy operations are running non blocking to all enqueued kernel.

This BUG is also available in the last release 0.3.5. This means we need to do a bugfix release even we release soon 0.4.0.

We have different possibilities to solve it

  • create blocking queues with cudaStreamCreate()

    • this will introduce the side effect that a memcopy will block all stream

  • use asynchronously memcopy and do an explicit synchronization after each call (

    • will increase the host side overhead since a API call has a latency of ~10 - 16 us

#include <stdio.h>
#include "alpaka/alpaka.hpp"

//#define NATIVE_CUDA 0
#ifdef NATIVE_CUDA

#define CUDA_ASSERT(x) if ((x) != cudaSuccess) { printf("cuda fail on line %d", __LINE__); exit(1); }


__global__ void emptyCudaKernel(int threadElementExtent)
{
  assert(threadElementExtent == 1);
}

__global__ void myCudaKernel(const double* sourceData)
{
  int i = blockIdx.x*blockDim.x + threadIdx.x;
  // note: here we are supposed to check that i is in the array range
  // but this is not what is causing the issue
  if (sourceData[i] != 1.0)
    printf("%u %u %u: %f\n",blockDim.x, blockIdx.x, threadIdx.x, sourceData[i]);
}



int main(int argc, char* argv[])
{
  cudaStream_t cudaStream;
  void * memPtr;
  const size_t size = 900 * 5 * 5;

  CUDA_ASSERT(cudaSetDevice(0));
  CUDA_ASSERT(cudaStreamCreateWithFlags(&cudaStream, cudaStreamNonBlocking));  
  CUDA_ASSERT(cudaMalloc(&memPtr, size * sizeof(double)));

  // note: here we assume size is not a multiple of 64, this is unrelated to the issue
  dim3 gridDim(std::size_t(((size - 1) / 64) + 1), 1u, 1u);
  dim3 blockDim(64u, 1u, 1u);
  emptyCudaKernel<<<gridDim, blockDim, 0, cudaStream>>>(argc);
  CUDA_ASSERT(cudaStreamSynchronize(cudaStream));

  std::vector<double> sourceMemHost(size, 1.0);
  CUDA_ASSERT(cudaMemcpy(memPtr, sourceMemHost.data(), size * sizeof(double), cudaMemcpyHostToDevice));
  cudaStreamSynchronize(cudaStream);

  myCudaKernel<<<gridDim, blockDim, 0, cudaStream>>>((double*)memPtr);

  CUDA_ASSERT(cudaStreamSynchronize(cudaStream));
  CUDA_ASSERT(cudaStreamDestroy(cudaStream));
  return 0;
}


#else

struct MyKernel
{
   template<typename Acc>
   ALPAKA_FN_ACC void operator()(Acc const & acc, const double* sourceData) const
   {
     int i = alpaka::idx::getIdx<alpaka::Grid, alpaka::Threads>(acc)[0u];
     // note (same as for CUDA): here we are supposed to check that i is in the array range
     // but this is not what is causing the issue
     if(sourceData[i] != 1.0)
       printf("%u %u %u %lu\n",blockDim.x, blockIdx.x, threadIdx.x, sourceData[i]);
   }
};

struct EmptyKernel
{
   template<typename Acc>
   ALPAKA_FN_ACC void operator()(Acc const & acc, int threadElementExtent) const
   {
     assert(threadElementExtent == 1);
   }
};



int main(int argc, char* argv[])
{
   const size_t size = 900 * 5 * 5;

   using ComputeAccelerator = alpaka::acc::AccGpuCudaRt<alpaka::dim::DimInt<1>, std::size_t>;
   using ComputeDevice = alpaka::dev::Dev<ComputeAccelerator>;
   using ComputeStream = alpaka::stream::StreamCudaRtSync;

   ComputeDevice computeDevice(alpaka::pltf::getDevByIdx<alpaka::pltf::Pltf<ComputeDevice> >(0));
   ComputeStream computeStream (computeDevice);

   using V = alpaka::vec::Vec<alpaka::dim::DimInt<1>, std::size_t>;
   using WorkDivision = alpaka::workdiv::WorkDivMembers<alpaka::dim::DimInt<1>, std::size_t>;
   WorkDivision wd(V(std::size_t(((size - 1) / 64) + 1)), V(std::size_t(64)), V(std::size_t(1)));

   using HostAccelerator = alpaka::acc::AccCpuOmp2Blocks<alpaka::dim::DimInt<1>, std::size_t>;
   using HostDevice = alpaka::dev::Dev<HostAccelerator>;
   alpaka::vec::Vec<alpaka::dim::DimInt<1>, size_t> bufferSize (size);
   using HostBufferType = decltype(
     alpaka::mem::buf::alloc<double, size_t>(std::declval<HostDevice>(), bufferSize));
   using HostViewType = alpaka::mem::view::ViewPlainPtr<alpaka::dev::Dev<HostBufferType>,
     alpaka::elem::Elem<HostBufferType>, alpaka::dim::Dim<HostBufferType>, alpaka::size::Size<HostBufferType> >;

   HostDevice hostDevice(alpaka::pltf::getDevByIdx<alpaka::pltf::Pltf<HostDevice> >(0u));

   auto sourceMem = alpaka::mem::buf::alloc<double, size_t>(computeDevice, size);

   alpaka::stream::enqueue(computeStream, alpaka::exec::create<ComputeAccelerator>(wd, EmptyKernel(), argc));

   std::vector<double> sourceMemHost(size, 1.0);
   HostViewType hostBufferView(sourceMemHost.data(), hostDevice, bufferSize);
   alpaka::mem::view::copy(computeStream, sourceMem, hostBufferView, bufferSize);
   alpaka::wait::wait(computeStream);

   alpaka::stream::enqueue(computeStream,
     alpaka::exec::create<ComputeAccelerator>(wd, MyKernel(), alpaka::mem::view::getPtrNative(sourceMem)));

   alpaka::wait::wait(computeStream);

   return 0;
}
#endif
CUDA Bug

Most helpful comment

Test code for the upcoming alpaka 0.4.0 (added all renamings):

#include <stdio.h>
#include "alpaka/alpaka.hpp"

//#define NATIVE_CUDA 0
#ifdef NATIVE_CUDA

#define CUDA_ASSERT(x) if ((x) != cudaSuccess) { printf("cuda fail on line %d", __LINE__); exit(1); }


__global__ void emptyCudaKernel(int threadElementExtent)
{
  assert(threadElementExtent == 1);
}

__global__ void myCudaKernel(const double* sourceData)
{
  int i = blockIdx.x*blockDim.x + threadIdx.x;
  constexpr size_t size = 900 * 5 * 5;
  // note: here we are supposed to check that i is in the array range
  // but this is not what is causing the issue
  if (i < size && sourceData[i] != 1.0)
    printf("%u %u %u: %f\n",blockDim.x, blockIdx.x, threadIdx.x, sourceData[i]);
}



int main(int argc, char* argv[])
{
  cudaStream_t cudaStream;
  void * memPtr;
  const size_t size = 900 * 5 * 5;

  CUDA_ASSERT(cudaSetDevice(0));
  CUDA_ASSERT(cudaStreamCreateWithFlags(&cudaStream, cudaStreamNonBlocking));
  CUDA_ASSERT(cudaMalloc(&memPtr, size * sizeof(double)));

  // note: here we assume size is not a multiple of 64, this is unrelated to the issue
  dim3 gridDim(std::size_t(((size - 1) / 64) + 1), 1u, 1u);
  dim3 blockDim(64u, 1u, 1u);
  emptyCudaKernel<<<gridDim, blockDim, 0, cudaStream>>>(argc);
  CUDA_ASSERT(cudaStreamSynchronize(cudaStream));

  std::vector<double> sourceMemHost(size, 1.0);
  CUDA_ASSERT(cudaMemcpy(memPtr, sourceMemHost.data(), size * sizeof(double), cudaMemcpyHostToDevice));
  cudaStreamSynchronize(cudaStream);

  myCudaKernel<<<gridDim, blockDim, 0, cudaStream>>>((double*)memPtr);

  CUDA_ASSERT(cudaStreamSynchronize(cudaStream));
  CUDA_ASSERT(cudaStreamDestroy(cudaStream));
  return 0;
}


#else

struct MyKernel
{
   template<typename Acc>
   ALPAKA_FN_ACC void operator()(Acc const & acc, const double* sourceData) const
   {
     constexpr size_t size = 900 * 5 * 5;
     int i = alpaka::idx::getIdx<alpaka::Grid, alpaka::Threads>(acc)[0u];
     // note (same as for CUDA): here we are supposed to check that i is in the array range
     // but this is not what is causing the issue
     if(i < size && sourceData[i] != 1.0)
       printf("%u %u %u %lu\n",blockDim.x, blockIdx.x, threadIdx.x, sourceData[i]);
   }
};

struct EmptyKernel
{
   template<typename Acc>
   ALPAKA_FN_ACC void operator()(Acc const & acc, int threadElementExtent) const
   {
     assert(threadElementExtent == 1);
   }
};



int main(int argc, char* argv[])
{
   const size_t size = 900 * 5 * 5;

   using ComputeAccelerator = alpaka::acc::AccGpuCudaRt<alpaka::dim::DimInt<1>, std::size_t>;
   using ComputeDevice = alpaka::dev::Dev<ComputeAccelerator>;
   using ComputeStream = alpaka::queue::QueueCudaRtBlocking;

   ComputeDevice computeDevice(alpaka::pltf::getDevByIdx<alpaka::pltf::Pltf<ComputeDevice> >(0));
   ComputeStream computeStream (computeDevice);

   using V = alpaka::vec::Vec<alpaka::dim::DimInt<1>, std::size_t>;
   using WorkDivision = alpaka::workdiv::WorkDivMembers<alpaka::dim::DimInt<1>, std::size_t>;
   WorkDivision wd(V(std::size_t(((size - 1) / 64) + 1)), V(std::size_t(64)), V(std::size_t(1)));

   using HostAccelerator = alpaka::acc::AccCpuOmp2Blocks<alpaka::dim::DimInt<1>, std::size_t>;
   using HostDevice = alpaka::dev::Dev<HostAccelerator>;
   alpaka::vec::Vec<alpaka::dim::DimInt<1>, size_t> bufferSize (size);
   using HostBufferType = decltype(
     alpaka::mem::buf::alloc<double, size_t>(std::declval<HostDevice>(), bufferSize));
   using HostViewType = alpaka::mem::view::ViewPlainPtr<alpaka::dev::Dev<HostBufferType>,
     alpaka::elem::Elem<HostBufferType>, alpaka::dim::Dim<HostBufferType>, alpaka::idx::Idx<HostBufferType> >;

   HostDevice hostDevice(alpaka::pltf::getDevByIdx<alpaka::pltf::Pltf<HostDevice> >(0u));

   auto sourceMem = alpaka::mem::buf::alloc<double, size_t>(computeDevice, size);

   alpaka::queue::enqueue(computeStream, alpaka::kernel::createTaskKernel<ComputeAccelerator>(wd, EmptyKernel(), argc));

   std::vector<double> sourceMemHost(size, 1.0);
   HostViewType hostBufferView(sourceMemHost.data(), hostDevice, bufferSize);
   alpaka::mem::view::copy(computeStream, sourceMem, hostBufferView, bufferSize);
   alpaka::wait::wait(computeStream);

   alpaka::queue::enqueue(computeStream,
     alpaka::kernel::createTaskKernel<ComputeAccelerator>(wd, MyKernel(), alpaka::mem::view::getPtrNative(sourceMem)));

   alpaka::wait::wait(computeStream);

   return 0;
}
#endif

All 15 comments

This is somehow related to #768, #783 and #791 where we added support for using CPU queues from multiple host threads.
This was not supposed to work in the 3.x release line and so I would not port back this fix. It also does not work for CPU queues there. Porting back everything needed to make this feature work in the old releases would be a huge amount of work which I would not want anyone to waste his time on.
For the 4.0 release on the other hand it is an issue that should be fixed because we now also support this for the CPU queues.

What we implemented for the blocking CPU queue is the usage of a std::mutex that is locked. This results in only one task of multiple concurring host threads trying to execute something is ever executing at a specific time, while all host threads are blocked waiting. However the order in which the waiting tasks are executed is not specified (nobody guarantees first come - first serve). If you rely on the order of tasks being executed on the blocking CPU queue you still have to do manual synchronization within the host threads so that only one thread is ever trying to execute something in the queue at any time.
The only thing that the blocking CPU queue currently achieves is that only one task i ever executed at any time even when called from multiple threads. It does NOT preserve the order of calls. You might see this as a bug but until now this was no requirment (the same is true for this "bug").

The blocking GPU queue does not even achieve this "only one task in the queue is executed at any time" property when called from multiple threads as this was no requirement up until now. If you enqueued two memcpys from two host threads they are executed in parallel. Also the currentThreadWaitFor and empty functions on a blocking GPU queue do not work as expected when called from a different host thread because the blocking CUDA operations are never enqueued to a CUDA queue at all.

To fix the issue that currentThreadWaitFor and empty do not work we have to use the asynchronous CUDA memory operations together with cudaStreamSynchronize.

There would still be an issue that multiple tasks that are enqueued in parrallel into a blocking GPU queue may overlap their calls to cudaXxxAsync and cudaStreamSynchronize leading to

cudaXxxAsync1
cudaXxxAsync2
cudaStreamSynchronize1
cudaStreamSynchronize2

being received by the CUDA queue. This would be a pessimization because the host thread 1 would wait for the task enqueued by host thread 1 and 2. To prevent such things we would have to guard the access to the internal CUDA queue with host side synchronization (std::mutex lock).

Thi would on the other hand introduce the issue that std::mutex lock is not first come - first serve which is also present for the blocking CPU queue. To fix this we would have to adapt something like this.

This is somehow related to #768, #783 and #791 where we added support for using CPU queues from multiple host threads.
This was not supposed to work in the 3.x release line and so I would not port back this fix. It also does not work for CPU queues there. Porting back everything needed to make this feature work in the old releases would be a huge amount of work which I would not want anyone to waste his time on.
For the 4.0 release on the other hand it is an issue that should be fixed because we now also support this for the CPU queues.

Maybe I understood you wrong but we need to fix it on 0.3.5. The example shows that we enqueue the kernel and memcpy into a stream but since we ignore the stream in the implementation for enqueu in the blocking queue wo got the data race.
A simple fix for 0.3.5 is to use cudaStreamCreate without any flags than all works.

For 0.4.0 O will provide a fix on monday. To have the same behaviour on CPU and GPu we should use always the async api and wait after each memcopy.

A simple fix for 0.3.5 is to use cudaStreamCreate without any flags than all works.

You can do this, but as you already wrote above this will have an undesired side effect that could also be classified as a bug. This will fix a bug for some people but introduce a bug for others.

Where exactly is the bug that you described? In your example you only have one stream and only one host CPU thread. In an abstract way what I see is the following order of tasks called on an StreamCudaRtSync:

alpaka::stream::enqueue()
alpaka::mem::view::copy()
alpaka::wait::wait()
alpaka::stream::enqueue()
alpaka::wait::wait()

The wait calls should not be necessary at all, the enqueue (has an cudaStreamSynchronize) and copy (uses cudaMemcpy) tasks should be blocking for this queue. What am I missing?

You missed that cudaMemcpy is not using any stream and is therefore enqueded in the cuda default stream instead of the user requested stream/queue. Because of that the memcpy is performed parallel/asynchronous to the kernel.
Normally cudaMemcpy is blocking all streams and is not started by the cuda driver before the work in all other streams is performed. But since we created our cuda streams we use in alpaka with cudaStreamCreate flag we explicit disabled this behavior. So all cuda api calls we use without a stream argument e.g. cudaaMemcpy will be performed sequential but kernel will run in the user given stream/queue and will never wait.

I looked again to my cuda example above and realised that I copied the version where I used cudaMemcpyAsyn instead of cudaMemcpy. Maybe you are therefore can not see the issue. sry for that. I will fix it in a few seconds

mhhhh now I also confused. Normally cudaMemcpy must block the executor thread until the memcpy finished. It looks like it is not the case. I will have a look on this issue on Monday again.

This is exactly what I am wondering as well. My understanding was that cudaMemcpy blocks the calling host CPU thread.

Maybe alpaka calls cudaMemcpyPeer?
IIRC the native cuda code in the above example works while the alpaka code doesn't. The actual design flaw is ignoring the stream for alpaka::mem::view::copy. Maybe the correct solution for 0.4.0 is to use cudaMemcpyAsync. It is a bug nevertheless.
BTW we don't bother about a backport in 0.3.5.

@krzikalla no, in that code snippet both CUDA and alpaka do not work due to race condition (so sometimes it might run fine, but not guaranteed). And they should not because of cudaMemcpy in the default stream not synchronizing with other streams created with cudaStreamNonBlocking. It is supposed to not synchronize, which was not clear to us before. So as suggested above, we could either use default stream creation flag instead of cudaStreamNonBlocking or use cudaMemcpyAsync with proper streams instead of cudaMemcpy.

What is the plan for this issue for the 0.4.0 release?

My plan is to switch fully to the async api to have the same behavior for all backends.
I will work on that this week.

Test code for the upcoming alpaka 0.4.0 (added all renamings):

#include <stdio.h>
#include "alpaka/alpaka.hpp"

//#define NATIVE_CUDA 0
#ifdef NATIVE_CUDA

#define CUDA_ASSERT(x) if ((x) != cudaSuccess) { printf("cuda fail on line %d", __LINE__); exit(1); }


__global__ void emptyCudaKernel(int threadElementExtent)
{
  assert(threadElementExtent == 1);
}

__global__ void myCudaKernel(const double* sourceData)
{
  int i = blockIdx.x*blockDim.x + threadIdx.x;
  constexpr size_t size = 900 * 5 * 5;
  // note: here we are supposed to check that i is in the array range
  // but this is not what is causing the issue
  if (i < size && sourceData[i] != 1.0)
    printf("%u %u %u: %f\n",blockDim.x, blockIdx.x, threadIdx.x, sourceData[i]);
}



int main(int argc, char* argv[])
{
  cudaStream_t cudaStream;
  void * memPtr;
  const size_t size = 900 * 5 * 5;

  CUDA_ASSERT(cudaSetDevice(0));
  CUDA_ASSERT(cudaStreamCreateWithFlags(&cudaStream, cudaStreamNonBlocking));
  CUDA_ASSERT(cudaMalloc(&memPtr, size * sizeof(double)));

  // note: here we assume size is not a multiple of 64, this is unrelated to the issue
  dim3 gridDim(std::size_t(((size - 1) / 64) + 1), 1u, 1u);
  dim3 blockDim(64u, 1u, 1u);
  emptyCudaKernel<<<gridDim, blockDim, 0, cudaStream>>>(argc);
  CUDA_ASSERT(cudaStreamSynchronize(cudaStream));

  std::vector<double> sourceMemHost(size, 1.0);
  CUDA_ASSERT(cudaMemcpy(memPtr, sourceMemHost.data(), size * sizeof(double), cudaMemcpyHostToDevice));
  cudaStreamSynchronize(cudaStream);

  myCudaKernel<<<gridDim, blockDim, 0, cudaStream>>>((double*)memPtr);

  CUDA_ASSERT(cudaStreamSynchronize(cudaStream));
  CUDA_ASSERT(cudaStreamDestroy(cudaStream));
  return 0;
}


#else

struct MyKernel
{
   template<typename Acc>
   ALPAKA_FN_ACC void operator()(Acc const & acc, const double* sourceData) const
   {
     constexpr size_t size = 900 * 5 * 5;
     int i = alpaka::idx::getIdx<alpaka::Grid, alpaka::Threads>(acc)[0u];
     // note (same as for CUDA): here we are supposed to check that i is in the array range
     // but this is not what is causing the issue
     if(i < size && sourceData[i] != 1.0)
       printf("%u %u %u %lu\n",blockDim.x, blockIdx.x, threadIdx.x, sourceData[i]);
   }
};

struct EmptyKernel
{
   template<typename Acc>
   ALPAKA_FN_ACC void operator()(Acc const & acc, int threadElementExtent) const
   {
     assert(threadElementExtent == 1);
   }
};



int main(int argc, char* argv[])
{
   const size_t size = 900 * 5 * 5;

   using ComputeAccelerator = alpaka::acc::AccGpuCudaRt<alpaka::dim::DimInt<1>, std::size_t>;
   using ComputeDevice = alpaka::dev::Dev<ComputeAccelerator>;
   using ComputeStream = alpaka::queue::QueueCudaRtBlocking;

   ComputeDevice computeDevice(alpaka::pltf::getDevByIdx<alpaka::pltf::Pltf<ComputeDevice> >(0));
   ComputeStream computeStream (computeDevice);

   using V = alpaka::vec::Vec<alpaka::dim::DimInt<1>, std::size_t>;
   using WorkDivision = alpaka::workdiv::WorkDivMembers<alpaka::dim::DimInt<1>, std::size_t>;
   WorkDivision wd(V(std::size_t(((size - 1) / 64) + 1)), V(std::size_t(64)), V(std::size_t(1)));

   using HostAccelerator = alpaka::acc::AccCpuOmp2Blocks<alpaka::dim::DimInt<1>, std::size_t>;
   using HostDevice = alpaka::dev::Dev<HostAccelerator>;
   alpaka::vec::Vec<alpaka::dim::DimInt<1>, size_t> bufferSize (size);
   using HostBufferType = decltype(
     alpaka::mem::buf::alloc<double, size_t>(std::declval<HostDevice>(), bufferSize));
   using HostViewType = alpaka::mem::view::ViewPlainPtr<alpaka::dev::Dev<HostBufferType>,
     alpaka::elem::Elem<HostBufferType>, alpaka::dim::Dim<HostBufferType>, alpaka::idx::Idx<HostBufferType> >;

   HostDevice hostDevice(alpaka::pltf::getDevByIdx<alpaka::pltf::Pltf<HostDevice> >(0u));

   auto sourceMem = alpaka::mem::buf::alloc<double, size_t>(computeDevice, size);

   alpaka::queue::enqueue(computeStream, alpaka::kernel::createTaskKernel<ComputeAccelerator>(wd, EmptyKernel(), argc));

   std::vector<double> sourceMemHost(size, 1.0);
   HostViewType hostBufferView(sourceMemHost.data(), hostDevice, bufferSize);
   alpaka::mem::view::copy(computeStream, sourceMem, hostBufferView, bufferSize);
   alpaka::wait::wait(computeStream);

   alpaka::queue::enqueue(computeStream,
     alpaka::kernel::createTaskKernel<ComputeAccelerator>(wd, MyKernel(), alpaka::mem::view::getPtrNative(sourceMem)));

   alpaka::wait::wait(computeStream);

   return 0;
}
#endif
Was this page helpful?
0 / 5 - 0 ratings

Related issues

tdd11235813 picture tdd11235813  路  5Comments

BenjaminW3 picture BenjaminW3  路  6Comments

psychocoderHPC picture psychocoderHPC  路  4Comments

ax3l picture ax3l  路  4Comments

BenjaminW3 picture BenjaminW3  路  3Comments