Перед запуском

В этом посте будут обсуждаться проблемы использования нескольких процессоров в Xv6 и некоторые часто используемые методы блокировки, такие как Spinlock, Mutex, Semaphore. Давайте сначала поговорим о реализации мультипроцессора в Xv6.

Вызов другого процессора в Xv6

Сначала взгляните на main.c в Xv6.

int
main(void)
{
  ....
  // start other processors
  startothers(); 
  kinit2((4*1024*1024), P2V(PHYSTOP));
  userinit();
}

Продолжайте переходить к startothers () (main.c 66)

pde_t entrypgdir[];  // For entry.S
static void 
startothers(void)
{
  extern uchar _binary_entryother_start[], _binary_entryother_size[];
  uchar *code;
  struct cpu *c;
  char *stack;
  
  // Write entry code to unused memory at 0x7000
  // The linker has placed the image of entryother.S in
  // _binary_entryother_start
  code = P2V(0x7000);
  memmove(code, _binary_entryother_start, (uint)_binary_entryother_size );
  
  // iterate all CPUs
  for(c = cpus; c < cpus+ncpu; c++) {
    if(c == mycpu())  // We've started already.
      continue;
    
    // Tell entryother.S what stack to use, where to enter, and what
    // pgdir to use. We cannot use kpgdir yet, because the AP processor
    // is running in low  memory, so we use entrypgdir for the APs too.
    // allocate one page size from freelist, no page table
    stack = kalloc();
    // point to the stack start addr
    *(void**)(code-4) = stack + KSTACKSIZE;
    *(void**)(code-8) = mpenter;
    // point to entrypgdir
    *(int**) = (void *) V2P(entrypgdir);
  }
  // start another processor running entry code at addr
  lapicstartap(c->apicid, V2P(code));
  // wait for cpu to finish mpmain()
  while(c->started == 0)
    ;
}

Почему в этой части все еще используется entrypgdir?

Это потому, что карта ядра еще не создается!

В соответствии :

*(void**)(code-8) = mpenter;

Он указывает на магическое прерывание между процессами (IPI) с точкой входа (mpenter ())

Затем lapicstartap () запускает код входа с 0x7000, который является entryother.S.

А теперь пора перейти к другому.

.code16           
.globl start
start:
  cli
  # Zero data segment registers DS, ES, and SS.
  # a xor a -> 0x0
  xorw    %ax,%ax
  movw    %ax,%ds # data segment
  movw    %ax,%es # extra segment
  movw    %ax,%ss # stack segment

Переключитесь из реального в защищенный режим. Затем перейдите к start32

  # Switch from real to protected mode.  Use a bootstrap GDT that makes
  # virtual addresses map directly to physical addresses so that the
  # effective memory map doesn't change during the transition.
  lgdt gdtdesc
  movl %cr0 %eax
  orl $CR0_PE, %eax
  movl %eax %cr0
  # Complete the transition to 32-bit protected mode by using a long jmp
  # to reload %cs and %eip.  The segment descriptors are set up with no
  # translation, so that the mapping is still the identity mapping.
  ljmpl    $(SEG_KCODE<<3), $(start32)

Теперь посмотрим на start32

//PAGEBREAK!
.code32  # Tell assembler to generate 32-bit code now.
start32:
  # Set up the protected-mode data segment registers
  movw    $(SEG_KDATA<<3), %ax    # Our data segment selector
  movw    %ax, %ds                # -> DS: Data Segment
  movw    %ax, %es                # -> ES: Extra Segment
  movw    %ax, %ss                # -> SS: Stack Segment
  movw    $0, %ax                 # Zero segments not ready for use
  movw    %ax, %fs                # -> FS
  movw    %ax, %gs                # -> GS
# Turn on page size extension for 4Mbyte pages
  movl    %cr4, %eax
  orl     $(CR4_PSE), %eax
  movl    %eax, %cr4
  
  # Use entrypgdir as our initial page table
  # Point to current stack position
  movl    (start-12), %eax
  movl    %eax, %cr3
 
  # Turn on paging.
  movl    %cr0, %eax
  orl     $(CR0_PE|CR0_PG|CR0_WP), %eax
  movl    %eax, %cr0
# Switch to the stack allocated by startothers()
  movl    (start-4), %esp
  # Call mpenter()
  call  *(start-8)

Важно отметить:

  1. % cr3 сохраняет текущую запись pgdir из (void **) (code-12) в startothers (void).
  2. Включите подкачку в % cr0
  3. Переместите (void **) (code-4) в % esp
  4. Вызов mpenter () из (void **) (code-8)

Продолжайте видеть mpenter () (main.c 44)

// Other CPUs jump here from entryother.S
static void
mpenter(void)
{
  // switch to kernel pgtable
  // which is initialized in first CPU at kvmalloc();
  switchkvm();
  // set up CPU's kernel segment descriptors
  seginit();
  lapicinit();
  mpmain();
}

switchkvm (недействительно)

// Switch h/w page table register to the kernel-only page table,
// for when no process is running.
void switchkvm(void)
{
  lcr3(V2P(kpgdir));  // switch to the kernel page table
}

Чтобы избежать бесконечного цикла поиска в структуре таблицы страниц, Xv6 использует% cr3 для хранения адреса каталога таблицы страниц.

Кроме того, seginit () предназначен для инициализации дескрипторов сегментов ядра.

vm.c

void
seginit(void)
{
  struct cpu *c;
  // Map "logical" addresses to virtual addresses using identity map.
  // Cannot share a CODE descriptor for both kernel and user
  // because it would have to have DPL_USR, but the CPU forbids
  // an interrupt from CPL=0 to DPL=3.
  c = &cpus[cpuid()];
  // permission, start, end, descriptor privilege level
  c->gdt[SEG_KCODE] = SEG(STA_X|STA_R, 0, 0xffffffff, 0);
  c->gdt[SEG_KDATA] = SEG(STA_W, 0, 0xffffffff, 0);
  
  // User Privilege Level should be 3 for interrupt
  c->gdt[SEG_UCODE] = SEG(STA_X|STA_R, 0, 0xffffffff, DPL_USER);
  c->gdt[SEG_UDATA] = SEG(STA_W, 0, 0xffffffff, DPL_USER);
  
  // x86.h 60
  lgdt(c->gdt, sizeof(c->gdt));
}

cpus определяется в proc.h

// for multi process
extern struct cpu cpus[NCPU];
extern int ncpu;

Изоляция процессорами

  • Текущий запущенный процесс

struct proc * proc в struct cpu

  • Стек ядра для прерывания

TSS хранится в глобальной таблице дескрипторов для каждого процессора и сохраняется как переменная ts в struct cpu.

Чтобы получить более подробную информацию, взгляните на struct cpu в proc.h

// Per-CPU state
struct cpu {
  uchar apicid;                // Local APIC ID
  struct context *scheduler;   // swtch() here to enter scheduler
  struct taskstate ts;         // Used by x86 to find stack for interrupt
  struct segdesc gdt[NSEGS];   // x86 global descriptor table
  volatile uint started;       // Has the CPU started?
  int ncli;                    // Depth of pushcli nesting.
  int intena;                  // Were interrupts enabled before pushcli?
  struct proc *proc;           // The process running on this cpu or null
};

Что ж, после знакомства с тем, как Xv6 настраивает мультипроцессоры. Пора поговорить о синхронизации!

Синхронизация

Определение из Википедии:

Процесс синхронизация относится к идее, что несколько процессов должны объединиться или подтвердить связь в определенный момент, чтобы достичь соглашения или выполнить определенную последовательность действий.

Короче говоря, это решение проблемы, когда несколько потребителей хотят получить доступ к общим ресурсам. Как поддерживать согласованность ресурсов в определенной последовательности.

Простой пример

struct node {
  int val;
  struct node* next;
}
struct node *list = 0;
void insert(int v) {
  struct *node n = malloc(sizeof(struct node));
  n->val = v;
  n->next = list;
  list = n;
}

Указатель списка всегда указывает на самый новый вставленный узел.

Вот такой случай.

Когда нет блокировки в обновлении указателя списка, процесс A и процесс B одновременно получают доступ к list = n. В чем проблема? Проблема в том, что мы ожидаем, что процесс A и процесс B должны иметь порядок связывания узла для списка.

Но на самом деле один из них становится сиротой из-за отсутствия блокировки в этих двух строках:

n->next = list;
list = n;

Решение такое:

struct lock listlock;
void insert(int v) {
  struct *node n = malloc(sizeof(struct node));
  n->val = v;
  acquire(&listlock);
  n->next = list;
  list = n;
  release(&listlock);
}

Теперь мы знаем важность блокировки. Давайте обсудим его реализацию.

Блокировка реализации

Spinlock

Определение. В программной инженерии S pinlock - это блокировка, которая заставляет поток, пытающийся получить ее, просто ждать в цикле («вращение»), многократно проверяя, замок имеется.

// Implementation 1
void 
acquire(struct spinlock *lk)
{
  for(;;)
  {
    if(!lk->locked)
    {
      lk->locked = 1;
      break;
    }
  }
}

Что в этом плохого ?

Если два процесса выполняют if (! Lk- ›locked) одновременно, реализация блокировки все равно не работает!

Решение: xchg () в сборке. Используйте аппаратный трюк, чтобы решить эту проблему.

xchg () делает следующее:

Заменить слово в памяти новым значением и вернуть старое значение

Вот реализация 2:

// Implementation 2
void 
acquire(struct spinlock *lk)
{
  // if old value is 1 (locked), keep spinning
  while(xchg(&lk->locked, 1) != 0)
    ;
}

Детали сборки (x86.h):

static inline uint
xchg(volatile uint *addr, uint newval)
{
  uint result;
  // The + in "+m" denotes a read-modify-write operand.
  // lock the bus make it atomic
  asm volatile("lock; xchgl %0 %1" :
               "+m" (*addr), "=a" (result) :
               "1" (newval) :
               "cc");
  return result;
}

Но при компиляции кода компилятор может оптимизировать, переупорядочив инструкции. То есть некоторые атомарные инструкции могут выйти из критического раздела.

// Implementation 3
void 
acquire(struct spinlock *lk)
{
  // if old value is 1 (locked), keep spinning
  while(xchg(&lk->locked, 1) != 0)
    ;
  // Tell the C compiler and the processor to not move loads or stores
  // past this point, to ensure that the critical section's memory
  // references happen after the lock is acquired.
  __sync_synchronize();
}

Добавьте __sync_synchronize (), чтобы компилятор не переупорядочивал инструкции

Тупик

На картинке выше показано состояние тупика. Если левый процесс и правый процесс вызывают первые два запроса одновременно, оба процесса ждут, пока другой процесс снимет блокировку.

Одно условие, вызывающее тупик: прерывание

Если прерывание происходит после того, как процесс получил блокировку, которая будет использоваться прерыванием, это приведет к тому, что прерывание продолжит вращаться.

Способ решения - отключение прерывания до получения блокировки.

Вот мы видим пример в spinlock.c

void
acquire(struct spinlock *lk)
{
  pushcli(); // disable interrupts to avoid deadlock.
  if(holding(lk))
    panic("acquire");
  
  while(xchg(&lk->locked, 1) != 0)
    ;
  
  // Tell the C compiler and the processor to not move loads or stores
  // past this point, to ensure that the critical section's memory
  // references happen after the lock is acquired.
  __sync_synchronize();
// Record info about lock acquisition for debugging.
  lk->cpu = mycpu();
  getcallerpcs(&lk, lk->pcs);
}
  • pushcli (): отключить прерывание по счетчику
  • popcli (): включить прерывание, если ncli == 0

Режим сна и пробуждения

  • Сон (канал)

›Перевести процесс вызова в спящий режим

›Освободить CPU для других работ

  • Пробуждение (канал)

›Пробудите все процессы, спящие на канале

›Заставить сон вернуться

void *
send(struct q *q, void *p)
{
  // keep spinning if recv is busy
  while(q->ptr != 0)
    ;
  q->ptr = p;
  // Wake up processes for recv in Channel q
  // This is because there is something to handle
  wakeup(q);
}
void *
recv(struct q *q)
{
  void *p;
  // if there is no job, turn to sleep
  while(p=q->ptr == 0)
    sleep(q);
  q->ptr = 0;
  return p;
}

Эта модель send & recv выглядит неплохо, но проблема все еще существует:

Объяснение: если recv войдет в цикл while перед выполнением сна, отправьте задачу put в q и немедленно просыпайтесь. Что случилось? Recv вызывает сон и больше не может проснуться из-за отправки продолжает вращаться, потому что q- ›ptr! = 0

Как это решить?

struct q {
  struct spinlock lock;
  void *ptr;
};
void send(struct q *q, void *p)
{
  acquire(&q->locked);
  while(q->ptr != 0)
    ;
  q->ptr = p;
  wakeup(q);
  release(&q->locked);
}
void recv(struct q *q)
{
  acquire(&q->locked);
  while(q->ptr == 0)
    sleep(q, &q->locked);
  p = q->ptr;
  q->ptr = 0;
  release(&q->locked);
  return p;
}
void sleep(void *chan, struct spinlock *lk)
{
  if(lk != &ptable.lock) {
    acquire(&ptable.lock);
    release(lk);
  }
  
  // Go to sleep
  proc->chan = chan;
  proc->state = SLEEPING;
  sched(); // context switch
  if(lk != &ptable.lock) {
    release(&ptable.lock);
    acquire(lk);
  }
}
// Wake up all processes sleeping on chan.
// The ptable lock must be held.
static void
wakeup1(void *chan)
{
  struct proc *p;
for(p = ptable.proc; p < &ptable.proc[NPROC]; p++)
    if(p->state == SLEEPING && p->chan == chan)
      p->state = RUNNABLE;
}
// Wake up all processes sleeping on chan.
void wakeup(void *chan)
{
  acquire(&ptable.lock);
  wakeup1(chan);
  release(&ptable.lock);
}

Объясните: все технологические операции защищены с помощью ptable.lock. Если у recv сначала нет работы, он перейдет в спящий режим. Разница в том, что если он перейдет в спящий режим, он получит еще один ptable.lock и выпустит (lk). Последнее, recv выполнит переключение контекста на планировщик.

Это почему ? Причина в том, что если он не освобождает lk, отправка застревает в тупике, ожидая & q- ›заблокированной.

Теперь отправка будет выполняться до пробуждения, потому что recv удерживает блокировку ptable- ›.

Затем recv в спящем режиме (ptable.lock) и ждем получения (lk).

А пока вернитесь, чтобы отправить продолжающееся пробуждение, и отпустите lk.

Последнее, recv может получить блокировку во сне