From 34c3969e38fba30bc1481239a0e4fb5223163fc4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Mon, 29 Apr 2024 14:39:16 +0100 Subject: [PATCH 1/2] tracing: add missing locks on read MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit It is not safe to access a global hashtable from multiple threads, even if the operations are read-only (it may be concurrently changed by another thread, which then may result in errors in the racing thread). This means we must always take the mutex, and because OCaml doesn't have a reader-writer mutex, we need to take the exclusive mutex. Eventually we should use a better datastructure here (immutable maps, or lock-free datastructures), but for now fix the datastructure that we currently use to be thread-safe. Signed-off-by: Edwin Török --- ocaml/libs/tracing/tracing.ml | 62 ++++++++++++++++++++++------------- 1 file changed, 39 insertions(+), 23 deletions(-) diff --git a/ocaml/libs/tracing/tracing.ml b/ocaml/libs/tracing/tracing.ml index 8327ad3a299..ff175434ad6 100644 --- a/ocaml/libs/tracing/tracing.ml +++ b/ocaml/libs/tracing/tracing.ml @@ -296,7 +296,10 @@ module Spans = struct let spans = Hashtbl.create 100 - let span_count () = Hashtbl.length spans + let span_count () = + Xapi_stdext_threads.Threadext.Mutex.execute lock (fun () -> + Hashtbl.length spans + ) let max_spans = ref 1000 @@ -308,9 +311,15 @@ module Spans = struct let finished_spans = Hashtbl.create 100 - let span_hashtbl_is_empty () = Hashtbl.length spans = 0 + let span_hashtbl_is_empty () = + Xapi_stdext_threads.Threadext.Mutex.execute lock (fun () -> + Hashtbl.length spans = 0 + ) - let finished_span_hashtbl_is_empty () = Hashtbl.length finished_spans = 0 + let finished_span_hashtbl_is_empty () = + Xapi_stdext_threads.Threadext.Mutex.execute lock (fun () -> + Hashtbl.length finished_spans = 0 + ) let add_to_spans ~(span : Span.t) = let key = span.context.trace_id in @@ -373,13 +382,14 @@ module Spans = struct match x with | None -> false - | Some (span : Span.t) -> ( - match Hashtbl.find_opt finished_spans span.context.trace_id with - | None -> - false - | Some span_list -> - List.exists (fun x -> x = span) span_list - ) + | Some (span : Span.t) -> + Xapi_stdext_threads.Threadext.Mutex.execute lock (fun () -> + match Hashtbl.find_opt finished_spans span.context.trace_id with + | None -> + false + | Some span_list -> + List.exists (fun x -> x = span) span_list + ) (** since copies the existing finished spans and then clears the existing spans as to only export them once *) let since () = @@ -389,7 +399,10 @@ module Spans = struct copy ) - let dump () = Hashtbl.(copy spans, Hashtbl.copy finished_spans) + let dump () = + Xapi_stdext_threads.Threadext.Mutex.execute lock (fun () -> + Hashtbl.(copy spans, Hashtbl.copy finished_spans) + ) module GC = struct let lock = Mutex.create () @@ -538,9 +551,12 @@ let lock = Mutex.create () let tracer_providers = Hashtbl.create 100 -let get_tracer_providers () = +let get_tracer_providers_unlocked () = Hashtbl.fold (fun _ provider acc -> provider :: acc) tracer_providers [] +let get_tracer_providers () = + Xapi_stdext_threads.Threadext.Mutex.execute lock get_tracer_providers_unlocked + let set ?enabled ?attributes ?endpoints ~uuid () = Xapi_stdext_threads.Threadext.Mutex.execute lock (fun () -> let provider = @@ -561,17 +577,17 @@ let set ?enabled ?attributes ?endpoints ~uuid () = failwith (Printf.sprintf "The TracerProvider : %s does not exist" uuid) in - Hashtbl.replace tracer_providers uuid provider - ) ; - if - List.for_all - (fun provider -> not provider.TracerProvider.enabled) - (get_tracer_providers ()) - then - Xapi_stdext_threads.Threadext.Mutex.execute Spans.lock (fun () -> - Hashtbl.clear Spans.spans ; - Hashtbl.clear Spans.finished_spans - ) + Hashtbl.replace tracer_providers uuid provider ; + if + List.for_all + (fun provider -> not provider.TracerProvider.enabled) + (get_tracer_providers_unlocked ()) + then + Xapi_stdext_threads.Threadext.Mutex.execute Spans.lock (fun () -> + Hashtbl.clear Spans.spans ; + Hashtbl.clear Spans.finished_spans + ) + ) let create ~enabled ~attributes ~endpoints ~name_label ~uuid = let endpoints = List.map endpoint_of_string endpoints in From 3ce95814fb95405770fbc2e599e5e8b3d0bb82f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Mon, 29 Apr 2024 14:41:22 +0100 Subject: [PATCH 2/2] tracing: replace global ref with Atomic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In preparation for OCaml 5, on OCaml 4 they'd be equivalent. Note that adding Atomic doesn't make operations on these values always atomic: that is the responsibility of surrounding code. E.g. Atomic.get + Atomic.set is not atomic, because another domain might've raced and changed the value inbetween (so in that case Atomic.compare_and_set should be used). However for global flags that are read multiple times, but set from a central place this isn't a problem. Signed-off-by: Edwin Török --- ocaml/libs/tracing/tracing.ml | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/ocaml/libs/tracing/tracing.ml b/ocaml/libs/tracing/tracing.ml index ff175434ad6..d010828d508 100644 --- a/ocaml/libs/tracing/tracing.ml +++ b/ocaml/libs/tracing/tracing.ml @@ -84,9 +84,9 @@ let validate_attribute (key, value) = && Re.execp attribute_key_regex key && W3CBaggage.Key.is_valid_key key -let observe = ref true +let observe = Atomic.make true -let set_observe mode = observe := mode +let set_observe mode = Atomic.set observe mode module SpanKind = struct type t = Server | Consumer | Client | Producer | Internal [@@deriving rpcty] @@ -301,13 +301,13 @@ module Spans = struct Hashtbl.length spans ) - let max_spans = ref 1000 + let max_spans = Atomic.make 1000 - let set_max_spans x = max_spans := x + let set_max_spans x = Atomic.set max_spans x - let max_traces = ref 1000 + let max_traces = Atomic.make 1000 - let set_max_traces x = max_traces := x + let set_max_traces x = Atomic.set max_traces x let finished_spans = Hashtbl.create 100 @@ -326,13 +326,13 @@ module Spans = struct Xapi_stdext_threads.Threadext.Mutex.execute lock (fun () -> match Hashtbl.find_opt spans key with | None -> - if Hashtbl.length spans < !max_traces then + if Hashtbl.length spans < Atomic.get max_traces then Hashtbl.add spans key [span] else debug "%s exceeded max traces when adding to span table" __FUNCTION__ | Some span_list -> - if List.length span_list < !max_spans then + if List.length span_list < Atomic.get max_spans then Hashtbl.replace spans key (span :: span_list) else debug "%s exceeded max traces when adding to span table" @@ -363,13 +363,13 @@ module Spans = struct Xapi_stdext_threads.Threadext.Mutex.execute lock (fun () -> match Hashtbl.find_opt finished_spans key with | None -> - if Hashtbl.length finished_spans < !max_traces then + if Hashtbl.length finished_spans < Atomic.get max_traces then Hashtbl.add finished_spans key [span] else debug "%s exceeded max traces when adding to finished span table" __FUNCTION__ | Some span_list -> - if List.length span_list < !max_spans then + if List.length span_list < Atomic.get max_spans then Hashtbl.replace finished_spans key (span :: span_list) else debug "%s exceeded max traces when adding to finished span table" @@ -407,7 +407,7 @@ module Spans = struct module GC = struct let lock = Mutex.create () - let span_timeout = ref 86400. + let span_timeout = Atomic.make 86400. let span_timeout_thread = ref None @@ -421,7 +421,7 @@ module Spans = struct let elapsed = Unix.gettimeofday () -. span.Span.begin_time in - if elapsed > !span_timeout *. 1000000. then ( + if elapsed > Atomic.get span_timeout *. 1000000. then ( debug "Tracing: Span %s timed out, forcibly finishing now" span.Span.context.span_id ; let span = @@ -444,14 +444,14 @@ module Spans = struct ) let initialise_thread ~timeout = - span_timeout := timeout ; + Atomic.set span_timeout timeout ; span_timeout_thread := Some (Thread.create (fun () -> while true do debug "Tracing: Span garbage collector" ; - Thread.delay !span_timeout ; + Thread.delay (Atomic.get span_timeout) ; gc_inactive_spans () done ) @@ -631,7 +631,7 @@ let enable_span_garbage_collector ?(timeout = 86400.) () = Spans.GC.initialise_thread ~timeout let with_tracing ?(attributes = []) ?(parent = None) ~name f = - if not !observe then + if not (Atomic.get observe) then f None else let tracer = get_tracer ~name in