Skip to Content

Concurrency

2019-01-21·Operating System

概念定义

ConceptDefinition
threadseparate process that 共享代码、共享数据、拥有独立栈堆
multi-threadparallelism and can avoid blocking program progress due to slow I/O
race conditionthe results depend on the timing execution of the code
critical sectiona piece of code that accesses a shared resource and must not be concurrently executed by more than one thread
POSIXPortable Operating System Interface
synchronization primitives

并发的问题

LackLocationReasonExampleSolution
顺序性编译器编译器优化sum 累加 -O1 -O2volatile
内存屏障
原子性操作系统中断随时到来sum 累加mutual lock
32位处理64位数(32-bit mov)
可见性Cache缓存双写 x,y指令集
乱序执行
  • 内存屏障
    • asm volatile("" ::: "memory")
    • __sync_synchronize()

Lock

Spin Lock 实现方法具体描述代码实现
atomic-exchangewhile (atomic_xchg(&locked, 1));asm volatile ("lock xchg %0, %1":"+m"(*addr), "=a"(result) : "1"(newval) : "cc");
compare-and-exchangewhile (comp_swap(&locked, 0, 1));lock cmpxchg1 %2, %1
load-linked and store-conditionalwhile(1) { while (LL(&lock)); if (SC(&lock, 1)) return; }(ARM, RISC-V, MIPS)
fetch-and-addint t=fetch-add(&lock->ticket); while (t!=&lock->turn);
  • spinlock: 持有锁时中断,其它进程等待浪费时间
    • Evaluating
      • Correctness
        • safety: 任意时刻至多一个
        • liveness: 至少一个
      • Fairness
      • Performance
  • spinlock+cli: (只有其它cpu可能在同一临界区spin)
    • 中断丢失可能严重损害I/O性能
    • 互斥的代码本身就需要等待中断
  • mutex lock
    • yield
    • sleeping with queue

并发数据结构

counter

  • single lock: not scalable
  • approximate counter: accuracy/performance trade-off

list

  • hand-over-hand locking

queue

hash table

Condition Variables

  • condition variable: an explicit queue that threads can put themselves on when some condition is waiting other thread
    • wait: release the lock and put the calling thread to sleep
    • wakeup: re-acquire the lock
    • signal: wake another thread
    • broadcast: wake all other threads
  • Always hold the lock while signaling
//wait
lock(&m)
while (done==0) wait(&c,&m)
unlock(&m)
//signal
lock(&m)
done=1
signal(&c)
unlock(&m)
SemanticsDefinitionSignaledSignaler
Hoarethe signaled thread immediately takes over the monitor, and the signaler is suspended可认为状态不变在 signaled thread 返回后继续
Mesathe signaled thread transitions back to the ready state状态可能改变:must examine the monitor state againthe signaler continues until it exits the monitor or waits

Semaphores

  • semaphore: an object with an integar value that we can manipulate with two routines: sem_wait() and sem_post()
    • sem_init(&s, 0, s)
    • P: sem_wait : s--, if s<0 wait
      • the negative value equals waiting threads
    • V: sem_post : s++, if exists waiting threads, wake one
  • semaphore as lock (binary semaphore)
    • sem_init(&s, 0, 1)
    • sem_wait() ... sem_post()
  • semaphore for ordering (signal variable)
    • sem_init(&s, 0, 0)
  • Implement of Semaphores
void zem_wait(){
  lock(&s->lock);
  while (s->value<=0) wait(&s->cond, &s->lock);
  s->value--;
  unlock(&s->lock);
}
void zem_post(){
  lock(&s->lock);
  s->value++;
  signal(&s->cond);
  unlock(&s->lock);
}

实际应用

Producer/Consumer Problem

//Lock Version
void *producer() {
  for (i=0; i<loops; ++i){
    lock(&m);
    while (count == MAX) wait(&empty, &m);
    put(i);
    signal(&fill);
    unlock(&m);
  }
}
 
void *consumer() {
  for (i=0; i<loops; ++i){
    lock(&m);
    while (count==0) wait(&fill, &m);
    get(i);
    signal(&empty);
    unlock(&m);
  }
}
//Semaphore Version
void *producer(){
  for (i=0; i<loops; ++i){
    sem_wait(&emtpy); // It is atomic so it can move here
    sem_wait(&mutex);
    put(i);
    sem_post(&mutex);
    sem_post(&full);
  }
}
 
void *consumer(){
  for (i=0; i<loops; ++i){
    sem_wait(&full); // It is atomic so it can move here
    sem_wait(&mutex);
    put(i);
    sem_post(&mutex);
    sem_wait(&empty);
  }
}
 
sem_init(&empty, 0, MAX);
sem_init(&full, 0, 0);
sem_init(&mutex, 0, 1);

Reader-Writer Locks

  • first reader aquires writelock
  • last reader releases writelock

The Dining Philosophers

  • big lock (not scalable)
  • Lock Ordering
  • add manager

并发 bug

  • Deadlock: ABBA
    • Cause
      • circular dependencies
      • encapsulation
      • Live lock: harder to handle
    • Prevention: 见表格
    • Avoidance
      • Scheduling
    • detect and recover
必要条件DescriptionpreventionRemarks
Mutual exclusion(互斥)一个资源每次只能被一个进程使用lock-freepowerful hardware instruction
Hold-and-wait(请求与保持)请求堵塞时不释放已获得的资源acquiring all lock at a time
No-preemption(不剥夺)进程已获得的资源不能强行剥夺release aquired lockmay lead to livelock
Circular wait(循环等待)若干进程之间形成头尾相接的循环等待资源关系total/partial ordering on lock acquisition不让外部调用,调用外部代码时释放锁
  • 原子性违反(AV): ABA
    • Cause:
      • 临界区间忘记上锁
      • serialization violated
    • solution: lock
  • 顺序违反(OV): BA
    • Cause
      • order flipped
      • use after free
    • solution: condition variables
  • 数据竞争
    • 两个线程对同一个共享变量的非只读并发访问

Event-based Concurrency

  • 解决问题
    • managing concurrency correctly in multi-threaded application is challenging
    • has little control over what is scheduled at a given moment in time
  • event loop
while (1){
  events = getEvents();
  for (e in events)
      processEvent(e);
}
  • select/poll
    • check whether there is any incoming I/O that should be attended to
    • select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *errorfds, struct timeval *timeout)
    • poll(struct pollfd fds[], nfds+t nfds, int timeout)
  • Problem: Blocking System Calls
    • solution: Asynchronous I/O
      • 轮训或signal
    • State Management
  • 同步/异步:线程关系,消息通知机制
    • 同步:一次性完成
    • 异步:两次完成
  • 阻塞/非阻塞:等待调用结果时的状态
    • 阻塞:做一件事
    • 非阻塞:做多件事
  • cpu密集型/io密集型
  • 协同式调度/抢占式调度

Thread API (pthread.h)

Thread

NameDescription
pthread_t线程结构体
pthread_attr_init()
pthread_create(pthread_t *, pthread_attr_t *, void * (*start_routine)(void *), void *arg)创建线程
pthread_join(pthread_t thread, void **value_ptr)等待线程结束

Lock

NameDescription
pthread_mutex_t互斥锁结构体
pthread_mutex_lock(pthread_mutex_t *mutex)int rc = pthread_mutex_init(&lock, NULL); assert(rc==0);
pthread_mutex_unlock(pthread_mutex_t *mutex)
pthread_mutex_init(pthread_mutex_t *mutex)lock=PTHREAD_MUTEX_INITIALIZER
pthread_mutex_destroy(pthread_mutex_t *mutex)

Condition

NameDescription
pthread_cond_t条件量结构体
pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
pthread_cond_signal(pthread_cond_t *cond)