Explorar el Código

Add a more friendly "latch" synchronization tool (#6372)

The standard `std::latch` is very restrictive in how it can be used, and
this makes it hard to easily leverage for simple coordination between a
set of dynamically scheduled tasks, where there isn't an interesting
synchronizing "merge" or future result.

This tool makes it easy to establish a latch, hand out handles to it,
and once all are destroyed, take whatever relevant action.

Note: this is split out of a larger change that uses it. I can wait
until the use case is ready, but seemed nice to review this separately.

---------

Co-authored-by: Dana Jansens <danakj@orodu.net>
Chandler Carruth hace 5 meses
padre
commit
4024d300bc
Se han modificado 4 ficheros con 354 adiciones y 0 borrados
  1. 22 0
      common/BUILD
  2. 60 0
      common/latch.cpp
  3. 135 0
      common/latch.h
  4. 137 0
      common/latch_test.cpp

+ 22 - 0
common/BUILD

@@ -414,6 +414,28 @@ cc_library(
     alwayslink = 1,
 )
 
+cc_library(
+    name = "latch",
+    srcs = ["latch.cpp"],
+    hdrs = ["latch.h"],
+    deps = [
+        ":check",
+        "@llvm-project//llvm:Support",
+    ],
+)
+
+cc_test(
+    name = "latch_test",
+    size = "small",
+    srcs = ["latch_test.cpp"],
+    deps = [
+        ":latch",
+        "//testing/base:gtest_main",
+        "@googletest//:gtest",
+        "@llvm-project//llvm:Support",
+    ],
+)
+
 cc_library(
     name = "map",
     hdrs = ["map.h"],

+ 60 - 0
common/latch.cpp

@@ -0,0 +1,60 @@
+// Part of the Carbon Language project, under the Apache License v2.0 with LLVM
+// Exceptions. See /LICENSE for license information.
+// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
+
+#include "common/latch.h"
+
+#include "common/check.h"
+
+namespace Carbon {
+
+auto Latch::Inc() -> void {
+  // The increment must be _atomic_ but is _relaxed_.
+  //
+  // Increments and decrements can happen concurrently on separate threads, so
+  // we need to prevent tearing and for there to be a total ordering of stores
+  // to this atomic.
+  //
+  // However we provide no _synchronization_ of the increment with any other
+  // operations. Instead, the caller must provide some extrinsic happens-before
+  // between its call to `Inc` and its later call to `Dec`. When that call to
+  // `Dec` synchronizes-with another call to `Dec`, all relaxed stores are
+  // covered by the resulting inter-thread happens-before relationship.
+  count_.fetch_add(1, std::memory_order_relaxed);
+}
+
+auto Latch::Dec() -> bool {
+  // The decrement is both an _acquire_ and _release_ operation.
+  //
+  // All threads which decrement to a non-zero value need to synchronize-with
+  // the thread which decrements to a zero value. This means the decrements to
+  // non-zero values need to have _release_ semantics that are _acquired_ by the
+  // decrement to zero. Since there is a single decrement operation, it must be
+  // both _acquire_ and _release_.
+  //
+  // Note that this technically provides a stronger guarantee than the contract
+  // of `Dec` requires -- *all* decrements synchronize with all decrements whose
+  // value they observe, we only need that to be true of the decrement arriving
+  // at zero. This could in theory be modeled by conditional fences, but those
+  // have their own problems and we don't need to model the more precise
+  // semantics for efficiency.
+  auto previous = count_.fetch_sub(1, std::memory_order_acq_rel);
+
+  CARBON_CHECK(previous > 0);
+  if (previous == 1) {
+    // Ensure that our closure is fully destroyed here, releasing any
+    // resources, locks, or other synchronization primitives.
+    auto on_zero = std::exchange(on_zero_, [] {});
+    std::move(on_zero)();
+    return true;
+  }
+  return false;
+}
+
+auto Latch::Init(llvm::unique_function<auto()->void> on_zero) -> Handle {
+  CARBON_CHECK(count_ == 0);
+  on_zero_ = std::move(on_zero);
+  return Handle(this);
+}
+
+}  // namespace Carbon

+ 135 - 0
common/latch.h

@@ -0,0 +1,135 @@
+// Part of the Carbon Language project, under the Apache License v2.0 with LLVM
+// Exceptions. See /LICENSE for license information.
+// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
+
+#ifndef CARBON_COMMON_LATCH_H_
+#define CARBON_COMMON_LATCH_H_
+
+#include <atomic>
+
+#include "llvm/ADT/FunctionExtras.h"
+
+namespace Carbon {
+
+// A synchronization primitive similar to `std::latch` to coordinate starting
+// some action once all of a set of other actions complete.
+//
+// Users initialize the latch (with `Init`), and receive a handle RAII object.
+// This handle can be copied, and the latch is satisfied when the last copy of
+// the handle returned by `Init` is destroyed.
+//
+// The latch synchronizes between every destruction of a handle and the
+// destruction of the last handle, allowing code that runs after the latch is
+// satisfied to access everything written by any thread that destroyed a handle.
+// For more details of the synchronization mechanics, see the comments on `Inc`
+// and `Dec` that implement this logic.
+//
+// This type also supports holding a closure to run when satisfied to simplify
+// patterns where that body of code is easier to express at the start of work
+// being synchronized instead of as each work item completes.
+//
+// The initialization API is separate from the constructor both for convenience
+// and to enable it to provide the initial handle. This makes it easy to build
+// constructively correct code where each work unit holds a handle until
+// finished, including the initializer of the latch, often using by-value
+// captures in a lambda that does the work.
+class Latch {
+ public:
+  class Handle;
+
+  Latch() = default;
+  Latch(const Latch&) = delete;
+  Latch(Latch&&) = delete;
+
+  // Initialize a latch and get the initial handle to it.
+  //
+  // When the last copy of the returned handle is destroyed, the latch will be
+  // satisfied.
+  //
+  // A closure may be provided which will be called when that last handle is
+  // destroyed. Note that the closure will run on whatever thread executes the
+  // last handle destruction. Typically, the closure here should _schedule_ the
+  // next step of work on some thread pool rather than performing it directly.
+  //
+  // Once this method is called, it cannot be called again until all handles are
+  // destroyed and the latch is satisfied. It can then be called again to get a
+  // fresh handle (and provide a new closure if desired).
+  auto Init(llvm::unique_function<auto()->void> on_zero = [] {}) -> Handle;
+
+ private:
+  // Increments the latch's counter.
+  //
+  // This is thread-safe, and may be called concurrently on multiple threads,
+  // and may be called concurrently with `Dec`. However, the caller _must_ call
+  // `Inc` and then `Dec`, and provide some happens-before relationship between
+  // the `Inc` and `Dec`. Typically, this is done with either same-thread
+  // happens-before, or because some other synchronization event such as
+  // starting a thread or popping a task from a thread pool provides the
+  // inter-thread happens-before relationship.
+  auto Inc() -> void;
+
+  // Decrements the latch's counter, and returns true when it reaches zero.
+  //
+  // This is thread-safe, and may be called concurrently with other calls to
+  // `Dec` or `Inc`.
+  //
+  // It also ensures that all threads which call `Dec` and receive `false`
+  // synchronize-with the thread that calls `Dec` and receives `true`. As a
+  // consequence everything that happens-before the call to `Dec` has an
+  // inter-thread happens-before for any code when `Dec` returns `true`.
+  //
+  // Note that there is no guarantee of inter-thread happens-before to
+  // operations after a `Dec` call that returns `false`.
+  auto Dec() -> bool;
+
+  std::atomic<int> count_;
+  llvm::unique_function<auto()->void> on_zero_;
+};
+
+// A copyable RAII handle around a `Latch`.
+//
+// When the last copy of a handle returned by `Latch::Init` is destroyed, the
+// latch is considered satisfied. Copying is supported by incrementing the
+// count of the latch. That increment can always be performed because it starts
+// from a live handle and so the count cannot have reached zero.
+//
+// For more details, see the `Latch` class.
+class Latch::Handle {
+ public:
+  Handle(const Handle& arg) : latch_(arg.latch_) {
+    if (latch_) {
+      arg.latch_->Inc();
+    }
+  }
+  Handle(Handle&& arg) noexcept : latch_(std::exchange(arg.latch_, nullptr)) {}
+
+  ~Handle() {
+    if (latch_) {
+      latch_->Dec();
+    }
+  }
+
+  // Drops a handle explicitly, rather than waiting for it to fall out of scope.
+  //
+  // This also allows observing whether the underlying latch is satisfied.
+  // Calls to this function synchronize with all other drops or destructions of
+  // latch handles when it returns true, and only the last will return true.
+  auto Drop() && -> bool {
+    bool last = latch_->Dec();
+    latch_ = nullptr;
+    return last;
+  }
+
+ private:
+  friend Latch;
+
+  // Private constructor used by `Latch::Init` to create the initial handle for
+  // a latch.
+  explicit Handle(Latch* latch) : latch_(latch) { latch_->Inc(); }
+
+  Latch* latch_ = nullptr;
+};
+
+}  // namespace Carbon
+
+#endif  // CARBON_COMMON_LATCH_H_

+ 137 - 0
common/latch_test.cpp

@@ -0,0 +1,137 @@
+// Part of the Carbon Language project, under the Apache License v2.0 with LLVM
+// Exceptions. See /LICENSE for license information.
+// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
+
+#include "common/latch.h"
+
+#include <gtest/gtest.h>
+
+#include <chrono>
+#include <thread>
+#include <vector>
+
+namespace Carbon {
+namespace {
+
+// Basic test for the Latch.
+TEST(LatchTest, Basic) {
+  Latch latch;
+  Latch::Handle handle = latch.Init();
+  // Dropping a copy doesn't satisfy the latch.
+  Latch::Handle handle_copy = handle;
+  EXPECT_FALSE(std::move(handle_copy).Drop());
+  // Can create more copies even after we start dropping.
+  Latch::Handle handle_copy2 = handle;
+  EXPECT_FALSE(std::move(handle_copy2).Drop());
+  // Now drop the last handle.
+  EXPECT_TRUE(std::move(handle).Drop());
+}
+
+// Tests that the on-zero callback is called.
+TEST(LatchTest, OnZeroCallback) {
+  Latch latch;
+  bool called = false;
+  Latch::Handle handle = latch.Init([&] { called = true; });
+  Latch::Handle handle2 = handle;
+  Latch::Handle handle3 = handle;
+
+  EXPECT_FALSE(called);
+  EXPECT_FALSE(std::move(handle).Drop());
+  EXPECT_FALSE(called);
+  EXPECT_FALSE(std::move(handle2).Drop());
+  EXPECT_FALSE(called);
+  EXPECT_TRUE(std::move(handle3).Drop());
+  EXPECT_TRUE(called);
+}
+
+// Tests moving a handle.
+TEST(LatchTest, MoveHandle) {
+  Latch latch;
+  bool called = false;
+  Latch::Handle handle = latch.Init([&] { called = true; });
+  Latch::Handle handle2 = std::move(handle);
+
+  // Check that dropping the new handle satisfies the latch.
+  EXPECT_FALSE(called);
+  EXPECT_TRUE(std::move(handle2).Drop());
+  EXPECT_TRUE(called);
+}
+
+// Test that creating and destroying a handle without dropping works.
+TEST(LatchTest, Destructor) {
+  Latch latch;
+  bool called = false;
+  Latch::Handle handle = latch.Init([&] { called = true; });
+  {
+    // NOLINTNEXTLINE(performance-unnecessary-copy-initialization)
+    Latch::Handle handle2 = handle;
+    EXPECT_FALSE(called);
+  }
+  EXPECT_FALSE(called);
+  EXPECT_TRUE(std::move(handle).Drop());
+  EXPECT_TRUE(called);
+}
+
+// Tests calling `Init` more than once.
+TEST(LatchTest, Reuse) {
+  Latch latch;
+  bool called = false;
+  Latch::Handle handle = latch.Init([&] { called = true; });
+  Latch::Handle handle2 = handle;
+
+  EXPECT_FALSE(called);
+  EXPECT_FALSE(std::move(handle).Drop());
+  EXPECT_FALSE(called);
+  EXPECT_TRUE(std::move(handle2).Drop());
+  EXPECT_TRUE(called);
+
+  // Now initialize the latch again with a new closure.
+  bool called2 = false;
+  Latch::Handle handle3 = latch.Init([&] { called2 = true; });
+  Latch::Handle handle4 = handle3;
+
+  EXPECT_FALSE(called2);
+  EXPECT_FALSE(std::move(handle3).Drop());
+  EXPECT_FALSE(called2);
+  EXPECT_TRUE(std::move(handle4).Drop());
+  EXPECT_TRUE(called2);
+}
+
+// Tests the latch with multiple threads.
+TEST(LatchTest, MultiThreaded) {
+  Latch latch;
+  std::atomic<int> counter = 0;
+  bool called = false;
+  constexpr int NumThreads = 5;
+
+  // The `on_zero` callback will be executed by the last thread to drop its
+  // handle.
+  auto handle = latch.Init([&] {
+    // Check that all threads have done their work.
+    EXPECT_EQ(counter.load(), NumThreads);
+    called = true;
+  });
+
+  std::vector<std::thread> threads;
+  threads.reserve(NumThreads);
+  for (int i = 0; i < NumThreads; ++i) {
+    threads.emplace_back([&, handle_copy = handle] {
+      // Each thread has its own copy of the handle.
+      // Simulate some work.
+      std::this_thread::sleep_for(std::chrono::milliseconds(10));
+      counter++;
+      // The handle is dropped when the thread exits.
+    });
+  }
+
+  // Drop the main thread's handle.
+  std::move(handle).Drop();
+
+  for (auto& thread : threads) {
+    thread.join();
+  }
+  EXPECT_TRUE(called);
+}
+
+}  // namespace
+}  // namespace Carbon