Skip to content
Snippets Groups Projects
Unverified Commit 7aab8ba1 authored by Yorick Peterse's avatar Yorick Peterse
Browse files

WIP more parallel GC

parent 432cb60e
No related branches found
No related tags found
No related merge requests found
Showing
with 1374 additions and 1354 deletions
//! Info about a garbage collection.
use crate::gc::heap_collector;
use crate::gc::profile::Profile;
//! Types and methods for scheduling garbage collection of a process.
use crate::gc::statistics::{CollectionStatistics, TraceStatistics};
use crate::gc::tracer::Tracer;
use crate::mailbox::Mailbox;
use crate::process::RcProcess;
use crate::vm::state::State;
use rayon::prelude::*;
use std::time::Instant;
 
/// A garbage collection to perform.
pub struct Collection {
/// The process that is being garbage collected.
pub process: RcProcess,
process: RcProcess,
 
/// A collection of garbage collection statistics.
pub profile: Profile,
/// The time at which garbage collection started.
start_time: Instant,
}
 
impl Collection {
pub fn new(process: RcProcess) -> Self {
Collection {
process,
profile: Profile::new(),
start_time: Instant::now(),
}
}
 
/// Performs the garbage collection request.
pub fn perform(mut self, vm_state: &State) {
heap_collector::collect(vm_state, &self.process, &mut self.profile);
/// Starts garbage collecting the process.
pub fn perform(&self, vm_state: &State) -> CollectionStatistics {
// We must lock the mailbox before performing any work, as otherwise new
// objects may be allocated during garbage collection.
let local_data = self.process.local_data_mut();
let mut mailbox = local_data.mailbox.lock();
let collect_mature = self.process.should_collect_mature_generation();
let move_objects = self.process.prepare_for_collection(collect_mature);
let trace_stats =
self.trace(&mut mailbox, move_objects, collect_mature);
self.process.reclaim_blocks(vm_state, collect_mature);
self.process
.update_collection_statistics(&vm_state.config, collect_mature);
// We drop the mutex guard before rescheduling so the process can
// immediately start receiving messages again, and so it can send itself
// messages.
drop(mailbox);
vm_state.scheduler.schedule(self.process.clone());
let stats = CollectionStatistics {
duration: self.start_time.elapsed(),
trace: trace_stats,
};
 
println!(
"[{:#x}] GC in {:?}, {} marked, {} promoted, {} evacuated",
self.process.identifier(),
self.profile.start_time.elapsed(),
self.profile.marked(),
self.profile.promoted(),
self.profile.evacuated(),
stats.duration,
stats.trace.marked,
stats.trace.promoted,
stats.trace.evacuated
);
stats
}
/// Traces through and marks all reachable objects.
fn trace(
&self,
mailbox: &mut Mailbox,
move_objects: bool,
mature: bool,
) -> TraceStatistics {
let globals = self.process.global_pointers_to_trace();
let messages = mailbox.pointers();
// TODO: collect from tracers.
let mut stats = TraceStatistics::new();
if move_objects {
stats += self
.process
.contexts()
.par_iter()
.map(|context| {
Tracer::new(self.process.clone())
.trace_with_moving(context.pointers(), mature)
})
.reduce(|| TraceStatistics::new(), |acc, curr| acc + curr);
stats += Tracer::new(self.process.clone())
.trace_with_moving(globals, mature);
stats += Tracer::new(self.process.clone())
.trace_with_moving(messages, mature);
} else {
stats += self
.process
.contexts()
.par_iter()
.map(|context| {
Tracer::new(self.process.clone())
.trace_without_moving(context.pointers(), mature)
})
.reduce(|| TraceStatistics::new(), |acc, curr| acc + curr);
// TODO: reuse Tracer
stats += Tracer::new(self.process.clone())
.trace_without_moving(globals, mature);
stats += Tracer::new(self.process.clone())
.trace_without_moving(messages, mature);
}
if mature {
// During a mature collection we don't examine the remembered set
// since we already traverse all mature objects. This allows us to
// remove any unmarked mature objects from the remembered set.
self.process.prune_remembered_set();
} else if self.process.has_remembered_objects() {
let remembered = self.process.remembered_pointers();
// TODO: reuse Tracer
if move_objects {
stats += Tracer::new(self.process.clone())
.trace_with_moving(remembered, true);
} else {
stats += Tracer::new(self.process.clone())
.trace_without_moving(remembered, true);
}
}
stats
}
}
 
#[cfg(test)]
mod tests {
use super::*;
use crate::binding::Binding;
use crate::block::Block;
use crate::config::Config;
use crate::object::Object;
use crate::object_pointer::ObjectPointer;
use crate::object_value;
use crate::vm::state::State;
use crate::vm::test::setup;
 
Loading
Loading
@@ -46,11 +150,364 @@ mod tests {
fn test_perform() {
let (_machine, _block, process) = setup();
let state = State::with_rc(Config::new(), &[]);
let pointer = process.allocate_empty();
let collection = Collection::new(process.clone());
process.set_register(0, pointer);
let stats = collection.perform(&state);
assert_eq!(stats.trace.marked, 1);
assert_eq!(stats.trace.evacuated, 0);
assert_eq!(stats.trace.promoted, 0);
assert!(pointer.is_marked());
}
#[test]
fn test_trace_trace_without_moving_without_mature() {
let (_machine, _block, process) = setup();
let collection = Collection::new(process.clone());
let young = process.allocate_empty();
let mature = process
.local_data_mut()
.allocator
.allocate_mature(Object::new(object_value::none()));
process.set_register(0, young);
process.set_register(1, mature);
let stats = collection.trace(
&mut process.local_data_mut().mailbox.lock(),
false,
false,
);
assert_eq!(stats.marked, 1);
assert_eq!(stats.evacuated, 0);
assert_eq!(stats.promoted, 0);
}
#[test]
fn test_trace_trace_without_moving_with_mature() {
let (_machine, _block, process) = setup();
let collection = Collection::new(process.clone());
let young = process.allocate_empty();
let mature = process
.local_data_mut()
.allocator
.allocate_mature(Object::new(object_value::none()));
process.set_register(0, young);
process.set_register(1, mature);
let stats = collection.trace(
&mut process.local_data_mut().mailbox.lock(),
false,
true,
);
assert_eq!(stats.marked, 2);
assert_eq!(stats.evacuated, 0);
assert_eq!(stats.promoted, 0);
}
#[test]
fn test_trace_trace_with_moving_without_mature() {
let (_machine, _block, process) = setup();
let collection = Collection::new(process.clone());
let young = process.allocate_empty();
let mature = process
.local_data_mut()
.allocator
.allocate_mature(Object::new(object_value::none()));
young.block_mut().set_fragmented();
process.set_register(0, young);
process.set_register(1, mature);
let stats = collection.trace(
&mut process.local_data_mut().mailbox.lock(),
true,
false,
);
assert_eq!(stats.marked, 1);
assert_eq!(stats.evacuated, 1);
assert_eq!(stats.promoted, 0);
}
#[test]
fn test_trace_trace_with_moving_with_mature() {
let (_machine, _block, process) = setup();
let collection = Collection::new(process.clone());
let young = process.allocate_empty();
let mature = process
.local_data_mut()
.allocator
.allocate_mature(Object::new(object_value::none()));
young.block_mut().set_fragmented();
mature.block_mut().set_fragmented();
process.set_register(0, young);
process.set_register(1, mature);
let stats = collection.trace(
&mut process.local_data_mut().mailbox.lock(),
true,
true,
);
assert_eq!(stats.marked, 2);
assert_eq!(stats.evacuated, 2);
assert_eq!(stats.promoted, 0);
}
#[test]
fn test_trace_remembered_set_without_moving() {
let (_machine, _block, process) = setup();
let collection = Collection::new(process.clone());
let local_data = process.local_data_mut();
let pointer1 = local_data
.allocator
.allocate_mature(Object::new(object_value::none()));
local_data.allocator.remember_object(pointer1);
process.prepare_for_collection(false);
let stats = collection.trace(
&mut process.local_data_mut().mailbox.lock(),
false,
false,
);
let remembered =
local_data.allocator.remembered_set.iter().next().unwrap();
assert!(remembered.is_marked());
assert_eq!(stats.marked, 1);
assert_eq!(stats.evacuated, 0);
assert_eq!(stats.promoted, 0);
}
#[test]
fn test_trace_remembered_set_with_moving() {
let (_machine, _block, process) = setup();
let collection = Collection::new(process.clone());
let local_data = process.local_data_mut();
let pointer1 = local_data
.allocator
.allocate_mature(Object::new(object_value::float(4.5)));
pointer1.block_mut().set_fragmented();
local_data.allocator.remember_object(pointer1);
process.prepare_for_collection(false);
let stats = collection.trace(
&mut process.local_data_mut().mailbox.lock(),
true,
false,
);
let remembered =
local_data.allocator.remembered_set.iter().next().unwrap();
assert_eq!(remembered.block().is_fragmented(), false);
assert!(remembered.is_marked());
assert!(remembered.float_value().is_ok());
assert_eq!(stats.marked, 1);
assert_eq!(stats.evacuated, 1);
assert_eq!(stats.promoted, 0);
}
#[test]
fn test_prune_remembered_set() {
let (_machine, _block, process) = setup();
let collection = Collection::new(process.clone());
let local_data = process.local_data_mut();
let pointer1 = local_data
.allocator
.allocate_mature(Object::new(object_value::none()));
let pointer2 = local_data
.allocator
.allocate_mature(Object::new(object_value::none()));
process.set_register(0, pointer2);
local_data.allocator.remember_object(pointer1);
local_data.allocator.remember_object(pointer2);
let stats = collection.trace(
&mut process.local_data_mut().mailbox.lock(),
false,
true,
);
let mut iter = local_data.allocator.remembered_set.iter();
assert!(iter.next() == Some(&pointer2));
assert!(iter.next().is_none());
assert_eq!(stats.marked, 1);
assert_eq!(stats.evacuated, 0);
assert_eq!(stats.promoted, 0);
}
#[test]
fn test_trace_mailbox_with_moving_without_mature() {
let (_machine, _block, process) = setup();
let young = process.allocate_empty();
let collection = Collection::new(process.clone());
let mature = process
.local_data_mut()
.allocator
.allocate_mature(Object::new(object_value::none()));
young.block_mut().set_fragmented();
process.send_message_from_self(young);
process.send_message_from_self(mature);
process.prepare_for_collection(false);
let stats = collection.trace(
&mut process.local_data_mut().mailbox.lock(),
true,
false,
);
assert_eq!(stats.marked, 1);
assert_eq!(stats.evacuated, 1);
assert_eq!(stats.promoted, 0);
assert_eq!(mature.is_marked(), false);
}
#[test]
fn test_trace_mailbox_with_moving_with_mature() {
let (_machine, _block, process) = setup();
let young = process.allocate_empty();
let collection = Collection::new(process.clone());
let mature = process
.local_data_mut()
.allocator
.allocate_mature(Object::new(object_value::none()));
young.block_mut().set_fragmented();
process.send_message_from_self(young);
process.send_message_from_self(mature);
process.prepare_for_collection(true);
let stats = collection.trace(
&mut process.local_data_mut().mailbox.lock(),
true,
true,
);
assert_eq!(stats.marked, 2);
assert_eq!(stats.evacuated, 1);
assert_eq!(stats.promoted, 0);
assert!(mature.is_marked());
}
#[test]
fn test_trace_mailbox_without_moving_without_mature() {
let (_machine, _block, process) = setup();
let young = process.allocate_empty();
let collection = Collection::new(process.clone());
let mature = process
.local_data_mut()
.allocator
.allocate_mature(Object::new(object_value::none()));
process.send_message_from_self(young);
process.send_message_from_self(mature);
process.prepare_for_collection(false);
let stats = collection.trace(
&mut process.local_data_mut().mailbox.lock(),
false,
false,
);
assert_eq!(stats.marked, 1);
assert_eq!(stats.evacuated, 0);
assert_eq!(stats.promoted, 0);
assert!(young.is_marked());
assert_eq!(mature.is_marked(), false);
}
#[test]
fn test_trace_mailbox_without_moving_with_mature() {
let (_machine, _block, process) = setup();
let young = process.allocate_empty();
let collection = Collection::new(process.clone());
let mature = process
.local_data_mut()
.allocator
.allocate_mature(Object::new(object_value::none()));
process.send_message_from_self(young);
process.send_message_from_self(mature);
process.prepare_for_collection(true);
let stats = collection.trace(
&mut process.local_data_mut().mailbox.lock(),
false,
true,
);
assert_eq!(stats.marked, 2);
assert_eq!(stats.evacuated, 0);
assert_eq!(stats.promoted, 0);
assert!(young.is_marked());
assert!(mature.is_marked());
}
#[test]
fn test_trace_with_panic_handler() {
let (_machine, block, process) = setup();
let collection = Collection::new(process.clone());
let local = process.allocate_empty();
let receiver = process.allocate_empty();
let code = process.context().code.clone();
let mut binding = Binding::with_rc(1, ObjectPointer::integer(1));
binding.set_local(0, local);
let new_block =
Block::new(code, Some(binding), receiver, block.global_scope);
let panic_handler =
process.allocate_without_prototype(object_value::block(new_block));
process.set_panic_handler(panic_handler);
process.prepare_for_collection(false);
let stats = collection.trace(
&mut process.local_data_mut().mailbox.lock(),
false,
false,
);
 
process.set_register(0, process.allocate_empty());
collection.perform(&state);
assert_eq!(stats.marked, 3);
assert_eq!(stats.evacuated, 0);
assert_eq!(stats.promoted, 0);
 
assert!(process.get_register(0).is_marked());
assert!(panic_handler.is_marked());
assert!(receiver.is_marked());
assert!(local.is_marked());
}
}
//! Workers for executing generic tasks.
//! Types for starting and coordinating garbage collecting of a process.
use crate::arc_without_weak::ArcWithoutWeak;
use crate::gc::collection::Collection;
use crate::scheduler::join_list::JoinList;
use crate::scheduler::pool_state::PoolState;
use crate::scheduler::queue::RcQueue;
use crate::scheduler::worker::Worker;
/// A worker that can be used for executing a wide variety of tasks, instead of
/// being limited to only executing lightweight processes.
///
/// GarbageCollection workers do not support task pinning, or scheduling tasks
/// directly onto a specific worker.
pub struct GarbageCollectionWorker {
use crate::scheduler::worker::Worker as WorkerTrait;
use crate::vm::state::RcState;
use std::thread;
/// A worker used for coordinating the garbage collecting a process.
pub struct Worker {
/// The queue owned by this worker.
queue: RcQueue<Collection>,
 
/// The state of the pool this worker belongs to.
state: ArcWithoutWeak<PoolState<Collection>>,
/// The VM state this worker belongs to.
vm_state: RcState,
}
 
impl GarbageCollectionWorker {
impl Worker {
pub fn new(
queue: RcQueue<Collection>,
state: ArcWithoutWeak<PoolState<Collection>>,
vm_state: RcState,
) -> Self {
GarbageCollectionWorker { queue, state }
}
}
impl Worker<Collection> for GarbageCollectionWorker {
fn run<F>(&mut self, callback: F)
where
F: Fn(&mut Self, Collection),
{
while self.state.is_alive() {
if self.process_local_jobs(&callback) {
continue;
}
if self.steal_from_other_queue() {
continue;
}
if self.steal_from_global_queue() {
continue;
}
self.state.park_while(|| !self.state.has_global_jobs());
Worker {
queue,
state,
vm_state,
}
}
}
 
impl WorkerTrait<Collection> for Worker {
fn state(&self) -> &PoolState<Collection> {
&self.state
}
Loading
Loading
@@ -56,102 +42,139 @@ impl Worker<Collection> for GarbageCollectionWorker {
fn queue(&self) -> &RcQueue<Collection> {
&self.queue
}
fn process_job(&mut self, job: Collection) {
job.perform(&self.vm_state);
}
}
/// A pool of threads for coordinating the garbage collecting of processes.
pub struct Pool {
state: ArcWithoutWeak<PoolState<Collection>>,
}
impl Pool {
pub fn new(threads: usize) -> Self {
assert!(threads > 0, "GC pools requires at least a single thread");
Self {
state: ArcWithoutWeak::new(PoolState::new(threads)),
}
}
/// Schedules a job onto the global queue.
pub fn schedule(&self, job: Collection) {
self.state.push_global(job);
}
/// Informs this pool it should terminate as soon as possible.
pub fn terminate(&self) {
self.state.terminate();
}
/// Starts the pool, without blocking the calling thread.
pub fn start(&self, vm_state: RcState) -> JoinList<()> {
let handles = self
.state
.queues
.iter()
.enumerate()
.map(|(index, queue)| {
self.spawn_thread(index, queue.clone(), vm_state.clone())
})
.collect();
JoinList::new(handles)
}
fn spawn_thread(
&self,
id: usize,
queue: RcQueue<Collection>,
vm_state: RcState,
) -> thread::JoinHandle<()> {
let state = self.state.clone();
thread::Builder::new()
.name(format!("GC {}", id))
.spawn(move || {
Worker::new(queue, state, vm_state).run();
})
.unwrap()
}
}
 
#[cfg(test)]
mod tests {
use super::*;
use crate::arc_without_weak::ArcWithoutWeak;
use crate::config::Config;
use crate::gc::collection::Collection;
use crate::vm::state::State;
use crate::vm::test::setup;
use parking_lot::Mutex;
 
fn worker() -> GarbageCollectionWorker {
fn worker() -> Worker {
let state = ArcWithoutWeak::new(PoolState::new(2));
let vm_state = State::with_rc(Config::new(), &[]);
 
GarbageCollectionWorker::new(state.queues[0].clone(), state)
Worker::new(state.queues[0].clone(), state, vm_state)
}
 
#[test]
fn test_run_global_jobs() {
fn test_worker_run_global_jobs() {
let (_machine, _block, process) = setup();
let mut worker = worker();
let number = ArcWithoutWeak::new(Mutex::new(0));
let number_copy = number.clone();
 
worker.state.push_global(Collection::new(process.clone()));
worker.state.push_global(Collection::new(process));
worker.run();
 
worker.run(move |worker, _| {
*number_copy.lock() = 1;
worker.state.terminate();
});
assert_eq!(*number.lock(), 1);
assert_eq!(worker.state.queues[1].has_local_jobs(), false);
assert_eq!(worker.state.has_global_jobs(), false);
}
 
#[test]
fn test_run_steal_then_terminate() {
fn test_worker_run_steal_then_terminate() {
let (_machine, _block, process) = setup();
let mut worker = worker();
let number = ArcWithoutWeak::new(Mutex::new(0));
let number_copy = number.clone();
 
worker.state.queues[1].push_internal(Collection::new(process.clone()));
worker.state.queues[1].push_internal(Collection::new(process));
worker.run();
 
worker.run(move |worker, _| {
*number_copy.lock() = 1;
worker.state.terminate();
});
assert_eq!(*number.lock(), 1);
assert_eq!(worker.state.queues[1].has_local_jobs(), false);
}
 
#[test]
fn test_run_steal_then_work() {
fn test_worker_run_steal_then_work() {
let (_machine, _block, process) = setup();
let mut worker = worker();
let number = ArcWithoutWeak::new(Mutex::new(0));
let number_copy = number.clone();
 
worker.state.queues[1].push_internal(Collection::new(process.clone()));
worker.queue.push_internal(Collection::new(process));
worker.run();
 
// Here the order of work is:
//
// 1. Steal from other queue
// 2. Go back to processing our own queue
// 3. Terminate
worker.run(move |worker, _| {
*number_copy.lock() += 1;
worker.queue.push_internal(Collection::new(process.clone()));
if *number_copy.lock() == 2 {
worker.state.terminate();
}
});
assert_eq!(*number.lock(), 2);
assert_eq!(worker.state.queues[1].has_local_jobs(), false);
assert_eq!(worker.queue.has_local_jobs(), false);
}
 
#[test]
fn test_run_work_then_terminate_steal_loop() {
let (_machine, _block, process) = setup();
let mut worker = worker();
let number = ArcWithoutWeak::new(Mutex::new(0));
let number_copy = number.clone();
#[should_panic]
fn test_pool_new_with_zero_threads() {
Pool::new(0);
}
 
worker.state.queues[0].push_internal(Collection::new(process.clone()));
worker.state.queues[1].push_internal(Collection::new(process.clone()));
#[test]
fn test_pool_spawn_thread() {
let (machine, _block, process) = setup();
let pool = Pool::new(1);
pool.schedule(Collection::new(process));
let thread = pool.spawn_thread(
0,
pool.state.queues[0].clone(),
machine.state.clone(),
);
 
worker.run(move |worker, _| {
*number_copy.lock() += 1;
worker.state.terminate();
});
thread.join().unwrap();
 
assert_eq!(*number.lock(), 1);
assert!(worker.state.queues[1].has_local_jobs());
assert_eq!(pool.state.has_global_jobs(), false);
}
}
This diff is collapsed.
pub mod collection;
pub mod heap_collector;
pub mod profile;
pub mod coordinator;
pub mod remembered_set;
pub mod statistics;
pub mod tracer;
pub mod work_list;
//! Recording of garbage collection statistics.
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;
/// A type used for recording garbage collection statistics, such as the number
/// of marked objects.
pub struct Profile {
/// The number of marked objects.
pub marked: AtomicUsize,
/// The number of evacuated objects.
pub evacuated: AtomicUsize,
/// The number of objects promoted to the full generation.
pub promoted: AtomicUsize,
/// The time at which garbage collection was scheduled.
pub start_time: Instant,
}
impl Profile {
pub fn new() -> Self {
Profile {
marked: AtomicUsize::new(0),
evacuated: AtomicUsize::new(0),
promoted: AtomicUsize::new(0),
start_time: Instant::now(),
}
}
pub fn add_marked(&self) {
self.marked.fetch_add(1, Ordering::Relaxed);
}
pub fn add_evacuated(&self) {
self.evacuated.fetch_add(1, Ordering::Relaxed);
}
pub fn add_promoted(&self) {
self.promoted.fetch_add(1, Ordering::Relaxed);
}
pub fn marked(&self) -> usize {
self.marked.load(Ordering::Acquire)
}
pub fn evacuated(&self) -> usize {
self.evacuated.load(Ordering::Acquire)
}
pub fn promoted(&self) -> usize {
self.promoted.load(Ordering::Acquire)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_new() {
let profile = Profile::new();
assert_eq!(profile.marked(), 0);
assert_eq!(profile.evacuated(), 0);
assert_eq!(profile.promoted(), 0);
}
#[test]
fn test_add_marked() {
let profile = Profile::new();
profile.add_marked();
assert_eq!(profile.marked(), 1);
}
#[test]
fn test_add_evacuated() {
let profile = Profile::new();
profile.add_evacuated();
assert_eq!(profile.evacuated(), 1);
}
#[test]
fn test_add_promoted() {
let profile = Profile::new();
profile.add_promoted();
assert_eq!(profile.promoted(), 1);
}
}
//! Types for storing garbage collection statistics.
use std::ops::{Add, AddAssign};
use std::time::Duration;
/// Statistics produced by a single thread tracing objects.
pub struct TraceStatistics {
/// The number of marked objects.
pub marked: usize,
/// The number of promoted objects.
pub promoted: usize,
/// The number of evacuated objects.
pub evacuated: usize,
}
impl TraceStatistics {
pub fn new() -> Self {
TraceStatistics {
marked: 0,
promoted: 0,
evacuated: 0,
}
}
}
impl Add for TraceStatistics {
type Output = TraceStatistics;
fn add(self, other: Self::Output) -> Self::Output {
Self {
marked: self.marked + other.marked,
promoted: self.promoted + other.promoted,
evacuated: self.evacuated + other.evacuated,
}
}
}
impl AddAssign for TraceStatistics {
fn add_assign(&mut self, other: Self) {
*self = Self {
marked: self.marked + other.marked,
promoted: self.promoted + other.promoted,
evacuated: self.evacuated + other.evacuated,
}
}
}
/// Statistics about a single garbage collection.
pub struct CollectionStatistics {
/// The total time spent garbage collecting.
pub duration: Duration,
/// The statistics produced by tracing objects.
pub trace: TraceStatistics,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_trace_statistics_add() {
let mut stat1 = TraceStatistics::new();
let mut stat2 = TraceStatistics::new();
stat1.marked = 1;
stat1.promoted = 1;
stat1.evacuated = 1;
stat2.marked = 1;
stat2.promoted = 1;
stat2.evacuated = 1;
let stat3 = stat1 + stat2;
assert_eq!(stat3.marked, 2);
assert_eq!(stat3.promoted, 2);
assert_eq!(stat3.evacuated, 2);
}
#[test]
fn test_trace_statistics_add_assign() {
let mut stat1 = TraceStatistics::new();
let mut stat2 = TraceStatistics::new();
stat1.marked = 1;
stat1.promoted = 1;
stat1.evacuated = 1;
stat2.marked = 1;
stat2.promoted = 1;
stat2.evacuated = 1;
stat1 += stat2;
assert_eq!(stat1.marked, 2);
assert_eq!(stat1.promoted, 2);
assert_eq!(stat1.evacuated, 2);
}
}
//! Tracing and marking of live objects.
use crate::gc::statistics::TraceStatistics;
use crate::gc::work_list::WorkList;
use crate::object::ObjectStatus;
use crate::object_pointer::ObjectPointer;
use crate::process::RcProcess;
macro_rules! skip_marked_pointer {
($pointer:expr, $mature:expr) => {
if $pointer.is_marked() {
continue;
}
if !$mature && $pointer.is_mature() {
continue;
}
};
}
/// A single thread tracing through live objects.
pub struct Tracer {
/// The process of which objects are being traced.
process: RcProcess,
}
impl Tracer {
pub fn new(process: RcProcess) -> Self {
Tracer { process }
}
/// Traces through the objects, without moving any.
pub fn trace_without_moving(
&mut self,
mut objects: WorkList,
mature: bool,
) -> TraceStatistics {
let mut stats = TraceStatistics::new();
while let Some(pointer_pointer) = objects.pop() {
let pointer = pointer_pointer.get();
skip_marked_pointer!(pointer, mature);
pointer.mark();
stats.marked += 1;
pointer.get().push_pointers(&mut objects);
}
stats
}
/// Traces through the given pointers, and potentially moves objects around.
pub fn trace_with_moving(
&mut self,
mut objects: WorkList,
mature: bool,
) -> TraceStatistics {
let mut stats = TraceStatistics::new();
while let Some(pointer_pointer) = objects.pop() {
let pointer = pointer_pointer.get_mut();
skip_marked_pointer!(pointer, mature);
match pointer.status() {
ObjectStatus::Resolve => pointer.resolve_forwarding_pointer(),
ObjectStatus::Promote => {
self.promote_mature(pointer);
self.trace_promoted_object(*pointer, &mut objects);
stats.promoted += 1;
}
ObjectStatus::Evacuate => {
self.evacuate(pointer);
stats.evacuated += 1;
}
ObjectStatus::PendingMove => {
objects.push(pointer_pointer.clone());
continue;
}
_ => {}
}
pointer.mark();
stats.marked += 1;
pointer.get().push_pointers(&mut objects);
}
stats
}
/// Traces a promoted object to see if it should be remembered in the
/// remembered set.
fn trace_promoted_object(
&self,
promoted: ObjectPointer,
objects: &mut WorkList,
) {
// TODO: reuse this WorkList per thread
let mut children = WorkList::new();
let mut remember = false;
promoted.get().push_pointers(&mut children);
while let Some(pointer_pointer) = children.pop() {
if pointer_pointer.get().is_young() {
remember = true;
}
objects.push(pointer_pointer);
}
if remember {
self.process.remember_object(promoted);
}
}
/// Promotes an object to the mature generation.
///
/// The pointer to promote is updated to point to the new location.
fn promote_mature(&self, pointer: &mut ObjectPointer) {
let local_data = self.process.local_data_mut();
let old_obj = pointer.get_mut();
let new_pointer = local_data.allocator.allocate_mature(old_obj.take());
old_obj.forward_to(new_pointer);
pointer.resolve_forwarding_pointer();
}
// Evacuates a pointer.
//
// The pointer to evacuate is updated to point to the new location.
fn evacuate(&self, pointer: &mut ObjectPointer) {
// When evacuating an object we must ensure we evacuate the object into
// the same bucket.
let local_data = self.process.local_data_mut();
let bucket = pointer.block_mut().bucket_mut().unwrap();
let old_obj = pointer.get_mut();
let new_obj = old_obj.take();
let (_, new_pointer) =
bucket.allocate(&local_data.allocator.global_allocator, new_obj);
old_obj.forward_to(new_pointer);
pointer.resolve_forwarding_pointer();
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::object::Object;
use crate::object_value;
use crate::vm::test::setup;
#[test]
fn test_promote_mature() {
let (_machine, _block, process) = setup();
let tracer = Tracer::new(process.clone());
let mut pointer =
process.allocate_without_prototype(object_value::float(15.0));
let old_address = pointer.raw.raw as usize;
tracer.promote_mature(&mut pointer);
let new_address = pointer.raw.raw as usize;
assert!(old_address != new_address);
assert!(pointer.is_mature());
assert_eq!(pointer.float_value().unwrap(), 15.0);
}
#[test]
fn test_evacuate() {
let (_machine, _block, process) = setup();
let tracer = Tracer::new(process.clone());
let mut pointer =
process.allocate_without_prototype(object_value::float(15.0));
let old_address = pointer.raw.raw as usize;
tracer.evacuate(&mut pointer);
let new_address = pointer.raw.raw as usize;
assert!(old_address != new_address);
assert_eq!(pointer.float_value().unwrap(), 15.0);
}
#[test]
fn test_trace_with_moving_without_mature() {
let (_machine, _block, process) = setup();
let mut tracer = Tracer::new(process.clone());
let young_parent = process.allocate_empty();
let young_child = process.allocate_empty();
young_parent.add_attribute(&process, young_child, young_child);
young_parent.block_mut().set_fragmented();
let mature = process
.local_data_mut()
.allocator
.allocate_mature(Object::new(object_value::none()));
mature.block_mut().set_fragmented();
let mut pointers = WorkList::new();
pointers.push(young_parent.pointer());
pointers.push(mature.pointer());
let stats = tracer.trace_with_moving(pointers, false);
assert_eq!(mature.is_marked(), false);
assert_eq!(stats.marked, 2);
assert_eq!(stats.evacuated, 2);
assert_eq!(stats.promoted, 0);
}
#[test]
fn test_trace_with_moving_with_mature() {
let (_machine, _block, process) = setup();
let mut tracer = Tracer::new(process.clone());
let young_parent = process.allocate_empty();
let young_child = process.allocate_empty();
young_parent.add_attribute(&process, young_child, young_child);
young_parent.block_mut().set_fragmented();
let mature = process
.local_data_mut()
.allocator
.allocate_mature(Object::new(object_value::none()));
mature.block_mut().set_fragmented();
let mut pointers = WorkList::new();
pointers.push(young_parent.pointer());
pointers.push(mature.pointer());
let stats = tracer.trace_with_moving(pointers, true);
assert_eq!(stats.marked, 3);
assert_eq!(stats.evacuated, 3);
assert_eq!(stats.promoted, 0);
}
#[test]
fn test_trace_without_moving_without_mature() {
let (_machine, _block, process) = setup();
let mut tracer = Tracer::new(process.clone());
let young_parent = process.allocate_empty();
let young_child = process.allocate_empty();
young_parent.add_attribute(&process, young_child, young_child);
let mature = process
.local_data_mut()
.allocator
.allocate_mature(Object::new(object_value::none()));
let mut pointers = WorkList::new();
pointers.push(young_parent.pointer());
pointers.push(mature.pointer());
let stats = tracer.trace_without_moving(pointers, false);
assert!(young_parent.is_marked());
assert!(young_child.is_marked());
assert_eq!(mature.is_marked(), false);
assert_eq!(stats.marked, 2);
assert_eq!(stats.evacuated, 0);
assert_eq!(stats.promoted, 0);
}
#[test]
fn test_trace_without_moving_with_mature() {
let (_machine, _block, process) = setup();
let mut tracer = Tracer::new(process.clone());
let young_parent = process.allocate_empty();
let young_child = process.allocate_empty();
young_parent.add_attribute(&process, young_child, young_child);
let mature = process
.local_data_mut()
.allocator
.allocate_mature(Object::new(object_value::none()));
let mut pointers = WorkList::new();
pointers.push(young_parent.pointer());
pointers.push(mature.pointer());
let stats = tracer.trace_without_moving(pointers, true);
assert!(young_parent.is_marked());
assert!(young_child.is_marked());
assert!(mature.is_marked());
assert_eq!(stats.marked, 3);
assert_eq!(stats.evacuated, 0);
assert_eq!(stats.promoted, 0);
}
}
Loading
Loading
@@ -194,13 +194,6 @@ impl Block {
block
}
 
#[inline(always)]
pub fn mark_value(&self) -> u8 {
self.bucket()
.and_then(|bucket| Some(bucket.mark_value))
.unwrap_or(1)
}
/// Returns an immutable reference to the header of this block.
#[inline(always)]
pub fn header(&self) -> &BlockHeader {
Loading
Loading
@@ -255,8 +248,8 @@ impl Block {
}
 
/// Returns true if all lines in this block are available.
pub fn is_empty(&self) -> bool {
self.used_lines_bytemap.is_empty(self.mark_value())
pub fn is_empty(&mut self) -> bool {
self.used_lines_bytemap.is_empty()
}
 
/// Returns a pointer to the first address to be used for objects.
Loading
Loading
@@ -443,10 +436,9 @@ impl Block {
pub fn update_hole_count(&mut self) -> usize {
let mut in_hole = false;
let mut holes = 0;
let mark_value = self.mark_value();
 
for index in LINE_START_SLOT..LINES_PER_BLOCK {
let is_set = self.used_lines_bytemap.is_set(index, mark_value);
let is_set = self.used_lines_bytemap.is_set(index);
 
if in_hole && is_set {
in_hole = false;
Loading
Loading
@@ -462,12 +454,12 @@ impl Block {
}
 
/// Returns the number of marked lines in this block.
pub fn marked_lines_count(&self) -> usize {
self.used_lines_bytemap.len(self.mark_value())
pub fn marked_lines_count(&mut self) -> usize {
self.used_lines_bytemap.len()
}
 
/// Returns the number of available lines in this block.
pub fn available_lines_count(&self) -> usize {
pub fn available_lines_count(&mut self) -> usize {
(LINES_PER_BLOCK - 1) - self.marked_lines_count()
}
 
Loading
Loading
@@ -486,11 +478,10 @@ impl Block {
let mut new_free = self.end_address();
let mut new_end = self.end_address();
let mut line = self.line_index_of_pointer(old_free);
let mark_value = self.mark_value();
 
// Find the start of the hole.
while line < LINES_PER_BLOCK {
if !self.used_lines_bytemap.is_set(line, mark_value) {
if !self.used_lines_bytemap.is_set(line) {
new_free = self.pointer_for_hole_starting_at_line(line);
found_hole = true;
 
Loading
Loading
@@ -502,7 +493,7 @@ impl Block {
 
// Find the end of the hole.
while line < LINES_PER_BLOCK {
if self.used_lines_bytemap.is_set(line, mark_value) {
if self.used_lines_bytemap.is_set(line) {
new_end = self.pointer_for_hole_starting_at_line(line);
break;
}
Loading
Loading
@@ -636,7 +627,7 @@ mod tests {
 
assert!(block.is_empty());
 
block.used_lines_bytemap.set(1, 1);
block.used_lines_bytemap.set(1);
 
assert_eq!(block.is_empty(), false);
}
Loading
Loading
@@ -682,7 +673,7 @@ mod tests {
let mut block = Block::boxed();
let start = block.start_address();
 
block.used_lines_bytemap.set(2, 1);
block.used_lines_bytemap.set(2);
 
find_available_hole!(block);
 
Loading
Loading
@@ -700,7 +691,7 @@ mod tests {
let mut block = Block::boxed();
let start = block.start_address();
 
block.used_lines_bytemap.set(2, 1);
block.used_lines_bytemap.set(2);
 
find_available_hole!(block);
 
Loading
Loading
@@ -755,7 +746,7 @@ mod tests {
let mut block = Block::boxed();
 
// First line is used
block.used_lines_bytemap.set(1, 1);
block.used_lines_bytemap.set(1);
block.recycle();
 
assert_eq!(block.free_pointer(), unsafe {
Loading
Loading
@@ -766,7 +757,7 @@ mod tests {
 
// first line is available, followed by a used line
block.used_lines_bytemap.reset();
block.used_lines_bytemap.set(2, 1);
block.used_lines_bytemap.set(2);
block.recycle();
 
assert_eq!(block.free_pointer(), block.start_address());
Loading
Loading
@@ -783,15 +774,15 @@ mod tests {
let pointer1 = Object::new(ObjectValue::None)
.write_to(block.request_pointer().unwrap());
 
block.used_lines_bytemap.set(1, 1);
block.used_lines_bytemap.set(1);
 
find_available_hole!(block);
 
let pointer2 = Object::new(ObjectValue::None)
.write_to(block.request_pointer().unwrap());
 
block.used_lines_bytemap.set(2, 1);
block.used_lines_bytemap.set(3, 1);
block.used_lines_bytemap.set(2);
block.used_lines_bytemap.set(3);
 
find_available_hole!(block);
 
Loading
Loading
@@ -808,7 +799,7 @@ mod tests {
let mut block = Block::boxed();
let start = block.start_address();
 
block.used_lines_bytemap.set(1, 1);
block.used_lines_bytemap.set(1);
 
find_available_hole!(block);
 
Loading
Loading
@@ -821,8 +812,8 @@ mod tests {
let mut block = Block::boxed();
let start = block.start_address();
 
block.used_lines_bytemap.set(1, 1);
block.used_lines_bytemap.set(3, 1);
block.used_lines_bytemap.set(1);
block.used_lines_bytemap.set(3);
 
find_available_hole!(block);
 
Loading
Loading
@@ -835,7 +826,7 @@ mod tests {
let mut block = Block::boxed();
 
for index in 1..LINES_PER_BLOCK {
block.used_lines_bytemap.set(index, 1);
block.used_lines_bytemap.set(index);
}
 
find_available_hole!(block);
Loading
Loading
@@ -848,8 +839,8 @@ mod tests {
fn test_block_find_available_hole_recycle() {
let mut block = Block::boxed();
 
block.used_lines_bytemap.set(1, 1);
block.used_lines_bytemap.set(2, 1);
block.used_lines_bytemap.set(1);
block.used_lines_bytemap.set(2);
 
find_available_hole!(block);
 
Loading
Loading
@@ -862,9 +853,9 @@ mod tests {
fn test_block_find_available_hole_pointer_range() {
let mut block = Block::boxed();
 
block.used_lines_bytemap.set(1, 1);
block.used_lines_bytemap.set(2, 1);
block.used_lines_bytemap.set(LINES_PER_BLOCK - 1, 1);
block.used_lines_bytemap.set(1);
block.used_lines_bytemap.set(2);
block.used_lines_bytemap.set(LINES_PER_BLOCK - 1);
 
find_available_hole!(block);
 
Loading
Loading
@@ -894,8 +885,8 @@ mod tests {
block.set_end_pointer(start_addr);
 
block.set_bucket(&mut bucket as *mut Bucket);
block.used_lines_bytemap.set(1, 1);
block.marked_objects_bytemap.set(1, 1);
block.used_lines_bytemap.set(1);
block.marked_objects_bytemap.set(1);
 
block.reset();
 
Loading
Loading
@@ -904,8 +895,8 @@ mod tests {
assert!(block.free_pointer() == block.start_address());
assert!(block.end_pointer() == block.end_address());
assert!(block.bucket().is_none());
assert!(block.used_lines_bytemap.is_empty(1));
assert!(block.marked_objects_bytemap.is_empty(1));
assert!(block.used_lines_bytemap.is_empty());
assert!(block.marked_objects_bytemap.is_empty());
}
 
#[test]
Loading
Loading
@@ -926,9 +917,9 @@ mod tests {
fn test_block_update_hole_count() {
let mut block = Block::boxed();
 
block.used_lines_bytemap.set(1, 1);
block.used_lines_bytemap.set(3, 1);
block.used_lines_bytemap.set(10, 1);
block.used_lines_bytemap.set(1);
block.used_lines_bytemap.set(3);
block.used_lines_bytemap.set(10);
 
block.update_hole_count();
 
Loading
Loading
@@ -941,7 +932,7 @@ mod tests {
 
assert_eq!(block.marked_lines_count(), 0);
 
block.used_lines_bytemap.set(1, 1);
block.used_lines_bytemap.set(1);
 
assert_eq!(block.marked_lines_count(), 1);
}
Loading
Loading
@@ -952,7 +943,7 @@ mod tests {
 
assert_eq!(block.available_lines_count(), LINES_PER_BLOCK - 1);
 
block.used_lines_bytemap.set(1, 1);
block.used_lines_bytemap.set(1);
 
assert_eq!(block.available_lines_count(), LINES_PER_BLOCK - 2);
}
Loading
Loading
@@ -971,23 +962,23 @@ mod tests {
fn test_reset_mark_bytemaps() {
let mut block = Block::boxed();
 
block.used_lines_bytemap.set(1, 1);
block.marked_objects_bytemap.set(1, 1);
block.used_lines_bytemap.set(1);
block.marked_objects_bytemap.set(1);
block.reset_mark_bytemaps();
 
assert_eq!(block.used_lines_bytemap.is_set(1, 1), false);
assert_eq!(block.marked_objects_bytemap.is_set(1, 1), false);
assert_eq!(block.used_lines_bytemap.is_set(1), false);
assert_eq!(block.marked_objects_bytemap.is_set(1), false);
}
 
#[test]
fn test_reset_marked_objects_bytemap() {
let mut block = Block::boxed();
 
block.used_lines_bytemap.set(1, 1);
block.marked_objects_bytemap.set(1, 1);
block.used_lines_bytemap.set(1);
block.marked_objects_bytemap.set(1);
block.reset_marked_objects_bytemap();
 
assert_eq!(block.used_lines_bytemap.is_set(1, 1), true);
assert_eq!(block.marked_objects_bytemap.is_set(1, 1), false);
assert_eq!(block.used_lines_bytemap.is_set(1), true);
assert_eq!(block.marked_objects_bytemap.is_set(1), false);
}
}
Loading
Loading
@@ -9,6 +9,7 @@ use std::ops::Drop;
use std::ops::Index;
 
/// A linked list of blocks.
#[derive(Default)]
pub struct BlockList {
/// The first (owned) block in the list, if any.
pub head: Option<Box<Block>>,
Loading
Loading
Loading
Loading
@@ -6,14 +6,13 @@ use crate::deref_pointer::DerefPointer;
use crate::immix::block::Block;
use crate::immix::block_list::BlockList;
use crate::immix::global_allocator::RcGlobalAllocator;
use crate::immix::histogram::MINIMUM_BIN;
use crate::immix::histograms::Histograms;
use crate::object::Object;
use crate::object_pointer::ObjectPointer;
use crate::vm::state::State;
use parking_lot::Mutex;
use rayon::prelude::*;
use std::cell::UnsafeCell;
use std::u8;
 
macro_rules! lock_bucket {
($bucket: expr) => {
Loading
Loading
@@ -46,16 +45,6 @@ pub struct Bucket {
 
/// The age of the objects in the current bucket.
pub age: i8,
/// The value to use for marking objects and lines.
///
/// This value is incremented every collection, and resets when it
/// overflows. This removes the need for clearing mark bits every cycle;
/// instead we only do this when overflowing.
///
/// This value is stored per bucket so it's easier to obtain from an object
/// pointer.
pub mark_value: u8,
}
 
unsafe impl Send for Bucket {}
Loading
Loading
@@ -72,7 +61,6 @@ impl Bucket {
current_block: DerefPointer::null(),
age,
lock: UnsafeCell::new(Mutex::new(())),
mark_value: 1,
}
}
 
Loading
Loading
@@ -186,104 +174,95 @@ impl Bucket {
///
/// Recyclable blocks are scheduled for re-use by the allocator, empty
/// blocks are to be returned to the global pool, and full blocks are kept.
pub fn reclaim_blocks(&mut self, state: &State, histograms: &Histograms) {
self.blocks
.pointers()
.into_par_iter()
.for_each(|mut block| {
if block.is_empty() {
block.reset();
} else {
let holes = block.update_hole_count();
// Clearing the fragmentation status is done so a block does
// not stay fragmented until it has been evacuated entirely.
// This ensures we don't keep evacuating objects when this
// may no longer be needed.
block.clear_fragmentation_status();
if holes > 0 {
pub fn reclaim_blocks(
&mut self,
state: &State,
histograms: &mut Histograms,
) {
let start = std::time::Instant::now();
let mut to_release = BlockList::new();
// We perform this work sequentially, as performing this in parallel
// would require multiple passes over the list of input blocks. We found
// that performing this work in parallel using Rayon ended up being
// about 20% slower, likely due to:
//
// 1. The overhead of distributing work across threads.
// 2. Having to run multiple passes over the input blocks.
for mut block in self.blocks.drain() {
if block.is_empty() {
block.reset();
to_release.push_front(block);
} else {
let holes = block.update_hole_count();
// Clearing the fragmentation status is done so a block does
// not stay fragmented until it has been evacuated entirely.
// This ensures we don't keep evacuating objects when this
// may no longer be needed.
block.clear_fragmentation_status();
if holes > 0 {
if holes >= MINIMUM_BIN {
histograms.marked.increment(
holes,
block.marked_lines_count() as u16,
block.marked_lines_count() as u32,
);
block.recycle();
}
block.recycle();
}
});
 
// We partition the blocks in sequence so we don't need to synchronise
// access to the destination lists.
for block in self.blocks.drain() {
if block.is_empty() {
state.global_allocator.add_block(block);
} else {
self.blocks.push_front(block);
}
}
 
state.global_allocator.add_blocks(&mut to_release);
self.reset_current_block();
println!("reclaimed in {:?}", start.elapsed());
}
 
/// Prepares this bucket for a collection.
///
/// Returns true if evacuation is needed for this bucket.
pub fn prepare_for_collection(
&mut self,
histograms: &Histograms,
histograms: &mut Histograms,
evacuate: bool,
) {
let reset_mark_value = self.should_reset_mark_value();
let mut required: isize = 0;
let mut available: isize = self
.blocks
.pointers()
.par_iter_mut()
.map(|block| {
let holes = block.holes();
// We ignore blocks with only a single hole, as those are not
// fragmented and not worth evacuating. This also ensures we
// ignore blocks added since the last collection, which will
// have a hole count of 1.
let amount = if evacuate && holes > 1 {
let block_count = block.available_lines_count() as u16;
histograms.available.increment(holes, block_count);
block_count as isize
} else {
0
};
// Resetting mark bytemaps requires zeroing them out. When doing
// this for a large number of blocks this may take some time. To
// reduce time spent zeroing, we only do so when the mark value
// overflows; which is once every 255 collections.
if reset_mark_value {
block.reset_mark_bytemaps();
}
let mut available: isize = 0;
for block in self.blocks.iter_mut() {
let holes = block.holes();
 
amount
})
.sum();
// We ignore blocks with only a single hole, as those are not
// fragmented and not worth evacuating. This also ensures we ignore
// blocks added since the last collection, which will have a hole
// count of 1.
if evacuate && holes >= MINIMUM_BIN {
let block_count = block.available_lines_count() as u32;
 
// The mark value must be updated after calculating available lines, as
// this calculation depends on the mark state of the previous
// collection.
self.update_mark_value();
histograms.available.increment(holes, block_count);
available += block_count as isize;
};
block.reset_mark_bytemaps();
}
 
if available > 0 {
let mut iter = histograms.marked.iter();
let mut min_bin = 0;
 
let start = std::time::Instant::now();
while available > required {
match iter.next() {
// Bucket 1 refers to blocks with only a single hole. Blocks
// with just one hole aren't fragmented, so we ignore those
// here.
Some(bin) if bin > 1 => {
Some(bin) => {
required += histograms.marked.get(bin) as isize;
available -= histograms.available.get(bin) as isize;
 
Loading
Loading
@@ -294,24 +273,14 @@ impl Bucket {
}
 
if min_bin > 0 {
self.blocks.pointers().par_iter_mut().for_each(|block| {
for block in self.blocks.iter_mut() {
if block.holes() >= min_bin {
block.set_fragmented();
}
});
}
}
}
}
pub fn should_reset_mark_value(&self) -> bool {
self.mark_value == u8::MAX
}
 
pub fn update_mark_value(&mut self) {
if self.should_reset_mark_value() {
self.mark_value = 1;
} else {
self.mark_value += 1;
println!("calculated fragmentation in {:?}", start.elapsed());
}
}
}
Loading
Loading
@@ -451,14 +420,14 @@ mod tests {
let state = State::with_rc(Config::new(), &[]);
let global_alloc = global_allocator();
let mut bucket = Bucket::new();
let histos = Histograms::new();
let mut histos = Histograms::new();
 
let (_, pointer) =
bucket.allocate(&global_alloc, Object::new(object_value::none()));
 
pointer.mark();
 
bucket.reclaim_blocks(&state, &histos);
bucket.reclaim_blocks(&state, &mut histos);
 
assert_eq!(bucket.blocks.len(), 1);
 
Loading
Loading
@@ -483,25 +452,23 @@ mod tests {
let block2 = Block::boxed();
let mut block3 = Block::boxed();
let state = State::with_rc(Config::new(), &[]);
let histos = Histograms::new();
let mut histos = Histograms::new();
 
block1
.used_lines_bytemap
.set(LINES_PER_BLOCK - 1, bucket.mark_value);
block1.used_lines_bytemap.set(LINES_PER_BLOCK - 1);
 
block3.used_lines_bytemap.set(2, bucket.mark_value);
block3.used_lines_bytemap.set(2);
 
bucket.add_block(block1);
bucket.add_block(block2);
bucket.add_block(block3);
bucket.reclaim_blocks(&state, &histos);
bucket.reclaim_blocks(&state, &mut histos);
 
assert_eq!(bucket.blocks.len(), 2);
 
assert_eq!(bucket.blocks[0].holes(), 1);
assert_eq!(bucket.blocks[1].holes(), 2);
 
assert_eq!(histos.marked.get(1), 1);
assert_eq!(histos.marked.get(1), 0); // Bucket 1 should not be set
assert_eq!(histos.marked.get(2), 1);
}
 
Loading
Loading
@@ -509,15 +476,15 @@ mod tests {
fn test_reclaim_blocks_full() {
let mut bucket = Bucket::new();
let mut block = Block::boxed();
let histos = Histograms::new();
let mut histos = Histograms::new();
let state = State::with_rc(Config::new(), &[]);
 
for i in 0..LINES_PER_BLOCK {
block.used_lines_bytemap.set(i, bucket.mark_value);
block.used_lines_bytemap.set(i);
}
 
bucket.add_block(block);
bucket.reclaim_blocks(&state, &histos);
bucket.reclaim_blocks(&state, &mut histos);
 
assert_eq!(bucket.blocks.len(), 1);
assert_eq!(bucket.current_block.is_null(), false);
Loading
Loading
@@ -526,23 +493,19 @@ mod tests {
#[test]
fn test_prepare_for_collection_without_evacuation() {
let mut bucket = Bucket::new();
let histos = Histograms::new();
let mut histos = Histograms::new();
 
bucket.add_block(Block::boxed());
bucket
.current_block()
.unwrap()
.used_lines_bytemap
.set(1, bucket.mark_value);
bucket.current_block().unwrap().used_lines_bytemap.set(1);
 
bucket.prepare_for_collection(&histos, false);
bucket.prepare_for_collection(&mut histos, false);
 
// No evacuation needed means the available histogram is not updated.
assert_eq!(histos.available.get(1), 0);
 
let block = bucket.current_block().unwrap();
let mut block = bucket.current_block().unwrap();
 
assert!(block.marked_objects_bytemap.is_empty(bucket.mark_value));
assert!(block.marked_objects_bytemap.is_empty());
}
 
#[test]
Loading
Loading
@@ -550,10 +513,10 @@ mod tests {
let mut bucket = Bucket::new();
let block1 = Block::boxed();
let mut block2 = Block::boxed();
let histos = Histograms::new();
let mut histos = Histograms::new();
 
block2.used_lines_bytemap.set(1, bucket.mark_value);
block2.used_lines_bytemap.set(3, bucket.mark_value);
block2.used_lines_bytemap.set(1);
block2.used_lines_bytemap.set(3);
block2.update_hole_count();
block2.set_fragmented();
 
Loading
Loading
@@ -561,47 +524,13 @@ mod tests {
bucket.add_block(block2);
histos.marked.increment(2, 1);
 
bucket.prepare_for_collection(&histos, true);
bucket.prepare_for_collection(&mut histos, true);
 
assert_eq!(histos.available.get(2), (LINES_PER_BLOCK - 3) as u16);
assert_eq!(histos.available.get(2), (LINES_PER_BLOCK - 3) as u32);
 
let block = bucket.current_block().unwrap();
let mut block = bucket.current_block().unwrap();
 
assert!(block.is_fragmented());
assert!(block.marked_objects_bytemap.is_empty(block.mark_value()));
}
#[test]
fn test_prepare_for_collection_update_mark_value() {
let mut bucket = Bucket::new();
let histos = Histograms::new();
assert_eq!(bucket.mark_value, 1);
bucket.prepare_for_collection(&histos, false);
assert_eq!(bucket.mark_value, 2);
}
#[test]
fn test_update_mark_value() {
let mut bucket = Bucket::new();
assert_eq!(bucket.mark_value, 1);
bucket.update_mark_value();
assert_eq!(bucket.mark_value, 2);
}
#[test]
fn test_update_mark_value_with_overflow() {
let mut bucket = Bucket::new();
for _ in 0..255 {
bucket.update_mark_value();
}
assert_eq!(bucket.mark_value, 1);
assert!(block.marked_objects_bytemap.is_empty());
}
}
Loading
Loading
@@ -2,11 +2,6 @@
//!
//! Bytemaps are used for marking live objects as well as marking which lines
//! are in use. An ObjectMap is used for marking objects while tracing.
//!
//! The value to set an entry to is provided by the user. This allows use of
//! different values to set something as "marked", removing the need for
//! resetting after every garbage collection cycle. These mark values are stored
//! externally, removing an additional 1 byte overhead per bytemap.
use crate::immix::block::{LINES_PER_BLOCK, OBJECTS_PER_BLOCK};
use std::mem;
use std::ptr;
Loading
Loading
@@ -23,13 +18,14 @@ pub struct LineMap {
}
 
pub trait Bytemap {
fn max_entries(&self) -> usize;
fn values(&self) -> &[AtomicU8];
fn values_mut(&mut self) -> &mut [AtomicU8];
fn reset(&mut self);
 
/// Sets the given index in the bytemap.
fn set(&mut self, index: usize, mark_value: u8) {
self.values()[index].store(mark_value, Ordering::Release);
fn set(&mut self, index: usize) {
self.values()[index].store(1, Ordering::Release);
}
 
/// Unsets the given index in the bytemap.
Loading
Loading
@@ -38,32 +34,50 @@ pub trait Bytemap {
}
 
/// Returns `true` if a given index is set.
fn is_set(&self, index: usize, mark_value: u8) -> bool {
self.values()[index].load(Ordering::Acquire) == mark_value
fn is_set(&self, index: usize) -> bool {
self.values()[index].load(Ordering::Acquire) == 1
}
 
/// Returns true if the bytemap is empty.
fn is_empty(&self, mark_value: u8) -> bool {
for value in self.values().iter() {
if value.load(Ordering::Acquire) == mark_value {
///
/// The number of values in a bytemap is a multiple of 2, and thus a
/// multiple of the word size of the current architecture. Since we store
/// bytes in the bytemap, this allows us to read multiple bytes at once.
/// This in turn allows us to greatly speed up checking if a bytemap is
/// empty.
///
/// The downside of this is that this method can not be used safely while
/// the bytemap is also being modified.
fn is_empty(&mut self) -> bool {
let mut offset = 0;
while offset < self.values().len() {
// The cast to *mut usize here is important so that reads read a
// single word, not a byte.
let value = unsafe {
let ptr = self.values().as_ptr().add(offset) as *const usize;
*ptr
};
if value > 0 {
return false;
}
offset += mem::size_of::<usize>();
}
 
true
}
 
/// The number of indexes set in the bytemap.
fn len(&self, mark_value: u8) -> usize {
let mut count = 0;
for value in self.values().iter() {
if value.load(Ordering::Acquire) == mark_value {
count += 1;
}
}
count
/// Returns the number of indexes set in the bytemap.
///
/// This method can not be used if the bytemap is modified concurrently.
fn len(&mut self) -> usize {
self.values_mut()
.iter_mut()
.map(|x| *x.get_mut() as usize)
.sum()
}
}
 
Loading
Loading
@@ -95,8 +109,9 @@ impl Bytemap for ObjectMap {
&self.values
}
 
fn max_entries(&self) -> usize {
OBJECTS_PER_BLOCK
#[inline(always)]
fn values_mut(&mut self) -> &mut [AtomicU8] {
&mut self.values
}
 
fn reset(&mut self) {
Loading
Loading
@@ -112,8 +127,9 @@ impl Bytemap for LineMap {
&self.values
}
 
fn max_entries(&self) -> usize {
LINES_PER_BLOCK
#[inline(always)]
fn values_mut(&mut self) -> &mut [AtomicU8] {
&mut self.values
}
 
fn reset(&mut self) {
Loading
Loading
@@ -132,50 +148,50 @@ mod tests {
fn test_object_map_set() {
let mut object_map = ObjectMap::new();
 
object_map.set(1, 1);
object_map.set(1);
 
assert!(object_map.is_set(1, 1));
assert!(object_map.is_set(1));
}
 
#[test]
fn test_object_map_unset() {
let mut object_map = ObjectMap::new();
 
object_map.set(1, 1);
object_map.set(1);
object_map.unset(1);
 
assert_eq!(object_map.is_set(1, 1), false);
assert_eq!(object_map.is_set(1), false);
}
 
#[test]
fn test_object_map_is_empty() {
let mut object_map = ObjectMap::new();
 
assert_eq!(object_map.is_empty(1), true);
assert_eq!(object_map.is_empty(), true);
 
object_map.set(1, 1);
object_map.set(1);
 
assert_eq!(object_map.is_empty(1), false);
assert_eq!(object_map.is_empty(), false);
}
 
#[test]
fn test_object_map_reset() {
let mut object_map = ObjectMap::new();
 
object_map.set(1, 1);
object_map.set(1);
object_map.reset();
 
assert_eq!(object_map.is_set(1, 1), false);
assert_eq!(object_map.is_set(1), false);
}
 
#[test]
fn test_object_map_len() {
let mut object_map = ObjectMap::new();
 
object_map.set(1, 1);
object_map.set(3, 1);
object_map.set(1);
object_map.set(3);
 
assert_eq!(object_map.len(1), 2);
assert_eq!(object_map.len(), 2);
}
 
#[test]
Loading
Loading
@@ -189,50 +205,55 @@ mod tests {
fn test_line_map_set() {
let mut line_map = LineMap::new();
 
line_map.set(1, 1);
line_map.set(1);
 
assert!(line_map.is_set(1, 1));
assert!(line_map.is_set(1));
}
 
#[test]
fn test_line_map_unset() {
let mut line_map = LineMap::new();
 
line_map.set(1, 1);
line_map.set(1);
line_map.unset(1);
 
assert_eq!(line_map.is_set(1, 1), false);
assert_eq!(line_map.is_set(1), false);
}
 
#[test]
fn test_line_map_is_empty() {
let mut line_map = LineMap::new();
 
assert_eq!(line_map.is_empty(1), true);
assert_eq!(line_map.is_empty(), true);
 
line_map.set(1, 1);
line_map.set(1);
assert_eq!(line_map.is_empty(), false);
line_map.unset(1);
line_map.set(60);
 
assert_eq!(line_map.is_empty(1), false);
assert_eq!(line_map.is_empty(), false);
}
 
#[test]
fn test_line_map_reset() {
let mut line_map = LineMap::new();
 
line_map.set(1, 1);
line_map.set(1);
line_map.reset();
 
assert_eq!(line_map.is_set(1, 1), false);
assert_eq!(line_map.is_set(1), false);
}
 
#[test]
fn test_line_map_len() {
let mut line_map = LineMap::new();
 
line_map.set(1, 1);
line_map.set(3, 1);
line_map.set(1);
line_map.set(3);
 
assert_eq!(line_map.len(1), 2);
assert_eq!(line_map.len(), 2);
}
 
#[test]
Loading
Loading
Loading
Loading
@@ -6,26 +6,26 @@
use crate::arc_without_weak::ArcWithoutWeak;
use crate::immix::block::Block;
use crate::immix::block_list::BlockList;
use crossbeam_queue::SegQueue;
use parking_lot::Mutex;
 
pub type RcGlobalAllocator = ArcWithoutWeak<GlobalAllocator>;
 
/// Structure used for storing the state of the global allocator.
pub struct GlobalAllocator {
blocks: SegQueue<Box<Block>>,
blocks: Mutex<BlockList>,
}
 
impl GlobalAllocator {
/// Creates a new GlobalAllocator with a number of blocks pre-allocated.
pub fn with_rc() -> RcGlobalAllocator {
ArcWithoutWeak::new(GlobalAllocator {
blocks: SegQueue::new(),
blocks: Mutex::new(BlockList::new()),
})
}
 
/// Requests a new free block from the pool
pub fn request_block(&self) -> Box<Block> {
if let Ok(block) = self.blocks.pop() {
if let Some(block) = self.blocks.lock().pop_front() {
block
} else {
Block::boxed()
Loading
Loading
@@ -34,26 +34,27 @@ impl GlobalAllocator {
 
/// Adds a block to the pool so it can be re-used.
pub fn add_block(&self, block: Box<Block>) {
self.blocks.push(block);
self.blocks.lock().push_front(block);
}
 
/// Adds multiple blocks to the global allocator.
pub fn add_blocks(&self, to_add: &mut BlockList) {
for block in to_add.drain() {
self.add_block(block);
}
let mut blocks = self.blocks.lock();
blocks.append(to_add);
}
}
 
#[cfg(test)]
mod tests {
use super::*;
use crate::immix::block_list::BlockList;
 
#[test]
fn test_new() {
let alloc = GlobalAllocator::with_rc();
 
assert!(alloc.blocks.pop().is_err());
assert!(alloc.blocks.lock().pop_front().is_none());
}
 
#[test]
Loading
Loading
@@ -64,7 +65,7 @@ mod tests {
alloc.add_block(block);
alloc.request_block();
 
assert!(alloc.blocks.pop().is_err());
assert!(alloc.blocks.lock().pop_front().is_none());
}
 
#[test]
Loading
Loading
@@ -74,6 +75,18 @@ mod tests {
 
alloc.add_block(block);
 
assert!(alloc.blocks.pop().is_ok());
assert!(alloc.blocks.lock().pop_front().is_some());
}
#[test]
fn test_add_blocks() {
let alloc = GlobalAllocator::with_rc();
let mut blocks = BlockList::new();
blocks.push_front(alloc.request_block());
blocks.push_front(alloc.request_block());
alloc.add_blocks(&mut blocks);
assert_eq!(alloc.blocks.lock().len(), 2);
}
}
Loading
Loading
@@ -3,22 +3,25 @@
//! A Histogram is used to track the distribution of marked and available lines
//! across Immix blocks. Each bin represents the number of holes with the values
//! representing the number of marked lines.
//!
//! Histograms are of a fixed size and use atomic operations for incrementing
//! bucket values, allowing concurrent use of the same histogram.
use crate::chunk::Chunk;
use std::sync::atomic::{AtomicU16, Ordering};
 
const DEFAULT_VALUE: u16 = 0;
/// The minimum bin number that we care about when obtaining the most fragmented
/// bins.
///
/// Bins 0 and 1 are not interesting, because blocks with 0 or 1 holes are not
/// used for calculating fragmentation statistics.
pub const MINIMUM_BIN: usize = 2;
 
pub struct Histogram {
values: Chunk<AtomicU16>,
// We use a u32 as this allows for 4 294 967 295 lines per bucket, which
// equals roughly 512 GB of lines.
values: Chunk<u32>,
}
 
/// Iterator for traversing the most fragmented bins in a histogram.
pub struct HistogramIterator<'a> {
histogram: &'a Histogram,
index: isize,
index: usize,
}
 
impl Histogram {
Loading
Loading
@@ -32,29 +35,29 @@ impl Histogram {
///
/// Bounds checking is not performed, as the garbage collector never uses an
/// out of bounds index.
pub fn increment(&self, index: usize, value: u16) {
self.values[index].fetch_add(value, Ordering::Release);
pub fn increment(&mut self, index: usize, value: u32) {
debug_assert!(index < self.values.len());
self.values[index] += value;
}
 
/// Returns the value for the given bin.
///
/// Bounds checking is not performed, as the garbage collector never uses an
/// out of bounds index.
pub fn get(&self, index: usize) -> u16 {
self.values[index].load(Ordering::Acquire)
pub fn get(&self, index: usize) -> u32 {
debug_assert!(index < self.values.len());
self.values[index]
}
 
/// Returns the most fragmented bin.
pub fn most_fragmented_bin(&self) -> usize {
let mut most_fragmented = 0;
let mut largest_value = 0;
let mut most_fragmented = MINIMUM_BIN;
 
for bin in 0..self.values.len() {
let value = self.values[bin].load(Ordering::Acquire);
if value > largest_value {
largest_value = value;
most_fragmented = bin;
for index in MINIMUM_BIN..self.values.len() {
if self.values[index] > self.values[most_fragmented] {
most_fragmented = index;
}
}
 
Loading
Loading
@@ -65,16 +68,14 @@ impl Histogram {
/// descending order.
pub fn iter(&self) -> HistogramIterator {
HistogramIterator {
index: self.most_fragmented_bin() as isize,
index: self.most_fragmented_bin(),
histogram: self,
}
}
 
/// Removes all values from the histogram.
pub fn reset(&mut self) {
for index in 0..self.values.len() {
self.values[index].store(DEFAULT_VALUE, Ordering::Release);
}
self.values.reset();
}
}
 
Loading
Loading
@@ -82,9 +83,12 @@ impl<'a> Iterator for HistogramIterator<'a> {
type Item = usize;
 
fn next(&mut self) -> Option<usize> {
while self.index >= 0 {
let index = self.index as usize;
let value = self.histogram.get(index as usize);
// When traversing the buckets we never care about bins 0 and 1, because
// blocks with 0 or 1 holes are not interesting for calculating
// fragmentation statistics.
while self.index >= MINIMUM_BIN {
let index = self.index;
let value = self.histogram.get(index);
 
self.index -= 1;
 
Loading
Loading
@@ -110,7 +114,7 @@ mod tests {
 
#[test]
fn test_increment() {
let histo = Histogram::new(1);
let mut histo = Histogram::new(1);
 
histo.increment(0, 10);
 
Loading
Loading
@@ -119,7 +123,7 @@ mod tests {
 
#[test]
fn test_increment_successive() {
let histo = Histogram::new(1);
let mut histo = Histogram::new(1);
 
histo.increment(0, 5);
histo.increment(0, 5);
Loading
Loading
@@ -129,29 +133,31 @@ mod tests {
 
#[test]
fn test_most_fragmented_bin() {
let histo = Histogram::new(4);
let mut histo = Histogram::new(4);
 
histo.increment(0, 5);
histo.increment(1, 7);
histo.increment(2, 3);
histo.increment(3, 2);
 
assert_eq!(histo.most_fragmented_bin(), 1);
assert_eq!(histo.most_fragmented_bin(), 2);
}
 
#[test]
fn test_iter() {
let histo = Histogram::new(3);
let mut histo = Histogram::new(4);
 
histo.increment(0, 10);
histo.increment(1, 20);
histo.increment(2, 25);
histo.increment(3, 30);
assert_eq!(histo.most_fragmented_bin(), 3);
 
let mut iter = histo.iter();
 
assert_eq!(iter.next().unwrap(), 3);
assert_eq!(iter.next().unwrap(), 2);
assert_eq!(iter.next().unwrap(), 1);
assert_eq!(iter.next().unwrap(), 0);
assert!(iter.next().is_none());
}
 
Loading
Loading
Loading
Loading
@@ -113,7 +113,7 @@ impl LocalAllocator {
 
for bucket in &mut self.young_generation {
bucket.prepare_for_collection(
&self.young_histograms,
&mut self.young_histograms,
self.evacuate_young,
);
 
Loading
Loading
@@ -124,7 +124,7 @@ impl LocalAllocator {
 
if mature {
self.mature_generation.prepare_for_collection(
&self.mature_histograms,
&mut self.mature_histograms,
self.evacuate_mature,
);
 
Loading
Loading
@@ -141,14 +141,14 @@ impl LocalAllocator {
self.young_histograms.reset();
 
for bucket in &mut self.young_generation {
bucket.reclaim_blocks(state, &self.young_histograms);
bucket.reclaim_blocks(state, &mut self.young_histograms);
}
 
if mature {
self.mature_histograms.reset();
 
self.mature_generation
.reclaim_blocks(state, &self.mature_histograms);
.reclaim_blocks(state, &mut self.mature_histograms);
}
}
 
Loading
Loading
Loading
Loading
@@ -265,10 +265,9 @@ impl ObjectPointer {
 
let object_index = block.object_index_of_pointer(pointer);
let line_index = block.line_index_of_pointer(pointer);
let mark_value = block.mark_value();
 
block.marked_objects_bytemap.set(object_index, mark_value);
block.used_lines_bytemap.set(line_index, mark_value);
block.marked_objects_bytemap.set(object_index);
block.used_lines_bytemap.set(line_index);
}
 
/// Unmarks the current object.
Loading
Loading
@@ -300,9 +299,8 @@ impl ObjectPointer {
let header = block_header_of(pointer);
let block = header.block_mut();
let index = block.object_index_of_pointer(pointer);
let mark_value = block.mark_value();
 
block.marked_objects_bytemap.is_set(index, mark_value)
block.marked_objects_bytemap.is_set(index)
}
 
/// Marks the object this pointer points to as being remembered in a
Loading
Loading
@@ -1007,29 +1005,24 @@ mod tests {
fn test_object_pointer_mark() {
let mut allocator = local_allocator();
let pointer = allocator.allocate_empty();
let mark_value = pointer.block().mark_value();
 
pointer.mark();
 
assert!(pointer.block().marked_objects_bytemap.is_set(4, mark_value));
assert!(pointer.block().used_lines_bytemap.is_set(1, mark_value));
assert!(pointer.block().marked_objects_bytemap.is_set(4));
assert!(pointer.block().used_lines_bytemap.is_set(1));
}
 
#[test]
fn test_object_pointer_unmark() {
let mut allocator = local_allocator();
let pointer = allocator.allocate_empty();
let mark_value = pointer.block().mark_value();
 
pointer.mark();
pointer.unmark();
 
assert_eq!(
pointer.block().marked_objects_bytemap.is_set(4, mark_value),
false
);
assert_eq!(pointer.block().marked_objects_bytemap.is_set(4), false);
 
assert!(pointer.block().used_lines_bytemap.is_set(1, mark_value));
assert!(pointer.block().used_lines_bytemap.is_set(1));
}
 
#[test]
Loading
Loading
Loading
Loading
@@ -528,6 +528,18 @@ impl Process {
pointers
}
 
pub fn remembered_pointers(&self) -> WorkList {
self.local_data_mut().allocator.remembered_pointers()
}
pub fn prune_remembered_set(&self) {
self.local_data_mut().allocator.prune_remembered_objects();
}
pub fn remember_object(&self, pointer: ObjectPointer) {
self.local_data_mut().allocator.remember_object(pointer);
}
pub fn waiting_for_message(&self) {
self.waiting_for_message.store(true, Ordering::Release);
}
Loading
Loading
//! Thread pool for performing garbage collection.
use crate::arc_without_weak::ArcWithoutWeak;
use crate::gc::collection::Collection;
use crate::scheduler::garbage_collection_worker::GarbageCollectionWorker;
use crate::scheduler::pool::Pool;
use crate::scheduler::pool_state::PoolState;
use crate::scheduler::queue::RcQueue;
use crate::scheduler::worker::Worker;
use std::thread;
/// A pool of threads for running generic tasks.
pub struct GarbageCollectionPool {
pub state: ArcWithoutWeak<PoolState<Collection>>,
/// The base name of every thread in this pool.
name: String,
}
impl GarbageCollectionPool {
pub fn new(name: String, threads: usize) -> Self {
assert!(
threads > 0,
"A GarbageCollectionPool requires at least a single thread"
);
Self {
name,
state: ArcWithoutWeak::new(PoolState::new(threads)),
}
}
}
impl Pool<Collection, GarbageCollectionWorker> for GarbageCollectionPool {
fn state(&self) -> &ArcWithoutWeak<PoolState<Collection>> {
&self.state
}
fn spawn_thread<F>(
&self,
id: usize,
queue: RcQueue<Collection>,
callback: ArcWithoutWeak<F>,
) -> thread::JoinHandle<()>
where
F: Fn(&mut GarbageCollectionWorker, Collection) + Send + 'static,
{
let state = self.state.clone();
thread::Builder::new()
.name(format!("{} {}", self.name, id))
.spawn(move || {
GarbageCollectionWorker::new(queue, state).run(&*callback);
})
.unwrap()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::gc::collection::Collection;
use crate::vm::test::setup;
use parking_lot::Mutex;
#[test]
#[should_panic]
fn test_new_with_zero_threads() {
GarbageCollectionPool::new("test".to_string(), 0);
}
#[test]
fn test_spawn_thread() {
let (_machine, _block, process) = setup();
let pool = GarbageCollectionPool::new("test".to_string(), 1);
let number = ArcWithoutWeak::new(Mutex::new(0));
let number_copy = number.clone();
let callback = ArcWithoutWeak::new(
move |worker: &mut GarbageCollectionWorker, _| {
*number_copy.lock() = 10;
worker.state().terminate();
},
);
let thread =
pool.spawn_thread(0, pool.state.queues[0].clone(), callback);
pool.schedule(Collection::new(process.clone()));
thread.join().unwrap();
assert_eq!(*number.lock(), 10);
}
}
Loading
Loading
@@ -14,13 +14,16 @@ impl<T> JoinList<T> {
 
/// Waits for all the threads to finish.
///
/// The return values of the threads are ignored.
pub fn join(self) -> ThreadResult<()> {
/// The return value is a Vec containing the return values of each joined
/// thread.
pub fn join(self) -> ThreadResult<Vec<T>> {
let mut values = Vec::with_capacity(self.handles.len());
for handle in self.handles {
handle.join()?;
values.push(handle.join()?);
}
 
Ok(())
Ok(values)
}
}
 
Loading
Loading
@@ -42,8 +45,12 @@ mod tests {
let handle2 =
thread::spawn(move || number2.fetch_add(1, Ordering::SeqCst));
 
JoinList::new(vec![handle1, handle2]).join().unwrap();
let values = JoinList::new(vec![handle1, handle2]).join().unwrap();
 
assert_eq!(number.load(Ordering::SeqCst), 2);
assert_eq!(values.len(), 2);
assert!(values.contains(&0));
assert!(values.contains(&1));
}
}
//! Task scheduling and execution using work stealing.
pub mod garbage_collection_pool;
pub mod garbage_collection_worker;
pub mod join_list;
pub mod park_group;
pub mod pool;
pub mod pool_state;
pub mod process_pool;
pub mod process_scheduler;
Loading
Loading
use crate::arc_without_weak::ArcWithoutWeak;
use crate::scheduler::join_list::JoinList;
use crate::scheduler::pool_state::PoolState;
use crate::scheduler::queue::RcQueue;
use crate::scheduler::worker::Worker;
use std::thread;
pub trait Pool<T: Send, W: Worker<T>> {
fn state(&self) -> &ArcWithoutWeak<PoolState<T>>;
/// Spawns a single OS thread that is to consume the given queue.
fn spawn_thread<F>(
&self,
id: usize,
queue: RcQueue<T>,
callback: ArcWithoutWeak<F>,
) -> thread::JoinHandle<()>
where
F: Fn(&mut W, T) + Send + 'static;
/// Schedules a job onto the global queue.
fn schedule(&self, job: T) {
self.state().push_global(job);
}
/// Informs this pool it should terminate as soon as possible.
fn terminate(&self) {
self.state().terminate();
}
/// Starts the pool, without blocking the calling thread.
fn start<F>(&self, callback: F) -> JoinList<()>
where
F: Fn(&mut W, T) + Send + 'static,
{
let rc_callback = ArcWithoutWeak::new(callback);
self.spawn_threads_for_range(0, &rc_callback)
}
/// Spawns OS threads for a range of queues, starting at the given position.
fn spawn_threads_for_range<F>(
&self,
start_at: usize,
callback: &ArcWithoutWeak<F>,
) -> JoinList<()>
where
F: Fn(&mut W, T) + Send + 'static,
{
let handles = self.state().queues[start_at..]
.iter()
.enumerate()
.map(|(index, queue)| {
// When using enumerate() with a range start > 0, the first
// index is still 0.
let worker_id = start_at + index;
self.spawn_thread(worker_id, queue.clone(), callback.clone())
})
.collect();
JoinList::new(handles)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment