Перед запуском
В этом посте будут обсуждаться проблемы использования нескольких процессоров в Xv6 и некоторые часто используемые методы блокировки, такие как Spinlock, Mutex, Semaphore. Давайте сначала поговорим о реализации мультипроцессора в Xv6.
Вызов другого процессора в Xv6
Сначала взгляните на main.c в Xv6.
int main(void) { .... // start other processors startothers(); kinit2((4*1024*1024), P2V(PHYSTOP)); userinit(); }
Продолжайте переходить к startothers () (main.c 66)
pde_t entrypgdir[]; // For entry.S static void startothers(void) { extern uchar _binary_entryother_start[], _binary_entryother_size[]; uchar *code; struct cpu *c; char *stack; // Write entry code to unused memory at 0x7000 // The linker has placed the image of entryother.S in // _binary_entryother_start code = P2V(0x7000); memmove(code, _binary_entryother_start, (uint)_binary_entryother_size ); // iterate all CPUs for(c = cpus; c < cpus+ncpu; c++) { if(c == mycpu()) // We've started already. continue; // Tell entryother.S what stack to use, where to enter, and what // pgdir to use. We cannot use kpgdir yet, because the AP processor // is running in low memory, so we use entrypgdir for the APs too. // allocate one page size from freelist, no page table stack = kalloc(); // point to the stack start addr *(void**)(code-4) = stack + KSTACKSIZE; *(void**)(code-8) = mpenter; // point to entrypgdir *(int**) = (void *) V2P(entrypgdir); } // start another processor running entry code at addr lapicstartap(c->apicid, V2P(code)); // wait for cpu to finish mpmain() while(c->started == 0) ; }
Почему в этой части все еще используется entrypgdir?
Это потому, что карта ядра еще не создается!
В соответствии :
*(void**)(code-8) = mpenter;
Он указывает на магическое прерывание между процессами (IPI) с точкой входа (mpenter ())
Затем lapicstartap () запускает код входа с 0x7000, который является entryother.S.
А теперь пора перейти к другому.
.code16 .globl start start: cli # Zero data segment registers DS, ES, and SS. # a xor a -> 0x0 xorw %ax,%ax movw %ax,%ds # data segment movw %ax,%es # extra segment movw %ax,%ss # stack segment
Переключитесь из реального в защищенный режим. Затем перейдите к start32
# Switch from real to protected mode. Use a bootstrap GDT that makes # virtual addresses map directly to physical addresses so that the # effective memory map doesn't change during the transition. lgdt gdtdesc movl %cr0 %eax orl $CR0_PE, %eax movl %eax %cr0 # Complete the transition to 32-bit protected mode by using a long jmp # to reload %cs and %eip. The segment descriptors are set up with no # translation, so that the mapping is still the identity mapping. ljmpl $(SEG_KCODE<<3), $(start32)
Теперь посмотрим на start32
//PAGEBREAK! .code32 # Tell assembler to generate 32-bit code now. start32: # Set up the protected-mode data segment registers movw $(SEG_KDATA<<3), %ax # Our data segment selector movw %ax, %ds # -> DS: Data Segment movw %ax, %es # -> ES: Extra Segment movw %ax, %ss # -> SS: Stack Segment movw $0, %ax # Zero segments not ready for use movw %ax, %fs # -> FS movw %ax, %gs # -> GS # Turn on page size extension for 4Mbyte pages movl %cr4, %eax orl $(CR4_PSE), %eax movl %eax, %cr4 # Use entrypgdir as our initial page table # Point to current stack position movl (start-12), %eax movl %eax, %cr3 # Turn on paging. movl %cr0, %eax orl $(CR0_PE|CR0_PG|CR0_WP), %eax movl %eax, %cr0 # Switch to the stack allocated by startothers() movl (start-4), %esp # Call mpenter() call *(start-8)
Важно отметить:
- % cr3 сохраняет текущую запись pgdir из (void **) (code-12) в startothers (void).
- Включите подкачку в % cr0
- Переместите (void **) (code-4) в % esp
- Вызов mpenter () из (void **) (code-8)
Продолжайте видеть mpenter () (main.c 44)
// Other CPUs jump here from entryother.S static void mpenter(void) { // switch to kernel pgtable // which is initialized in first CPU at kvmalloc(); switchkvm(); // set up CPU's kernel segment descriptors seginit(); lapicinit(); mpmain(); }
switchkvm (недействительно)
// Switch h/w page table register to the kernel-only page table, // for when no process is running. void switchkvm(void) { lcr3(V2P(kpgdir)); // switch to the kernel page table }
Чтобы избежать бесконечного цикла поиска в структуре таблицы страниц, Xv6 использует% cr3 для хранения адреса каталога таблицы страниц.
Кроме того, seginit () предназначен для инициализации дескрипторов сегментов ядра.
vm.c
void seginit(void) { struct cpu *c; // Map "logical" addresses to virtual addresses using identity map. // Cannot share a CODE descriptor for both kernel and user // because it would have to have DPL_USR, but the CPU forbids // an interrupt from CPL=0 to DPL=3. c = &cpus[cpuid()]; // permission, start, end, descriptor privilege level c->gdt[SEG_KCODE] = SEG(STA_X|STA_R, 0, 0xffffffff, 0); c->gdt[SEG_KDATA] = SEG(STA_W, 0, 0xffffffff, 0); // User Privilege Level should be 3 for interrupt c->gdt[SEG_UCODE] = SEG(STA_X|STA_R, 0, 0xffffffff, DPL_USER); c->gdt[SEG_UDATA] = SEG(STA_W, 0, 0xffffffff, DPL_USER); // x86.h 60 lgdt(c->gdt, sizeof(c->gdt)); }
cpus определяется в proc.h
// for multi process extern struct cpu cpus[NCPU]; extern int ncpu;
Изоляция процессорами
- Текущий запущенный процесс
struct proc * proc в struct cpu
- Стек ядра для прерывания
TSS хранится в глобальной таблице дескрипторов для каждого процессора и сохраняется как переменная ts в struct cpu.
Чтобы получить более подробную информацию, взгляните на struct cpu в proc.h
// Per-CPU state struct cpu { uchar apicid; // Local APIC ID struct context *scheduler; // swtch() here to enter scheduler struct taskstate ts; // Used by x86 to find stack for interrupt struct segdesc gdt[NSEGS]; // x86 global descriptor table volatile uint started; // Has the CPU started? int ncli; // Depth of pushcli nesting. int intena; // Were interrupts enabled before pushcli? struct proc *proc; // The process running on this cpu or null };
Что ж, после знакомства с тем, как Xv6 настраивает мультипроцессоры. Пора поговорить о синхронизации!
Синхронизация
Определение из Википедии:
Процесс синхронизация относится к идее, что несколько процессов должны объединиться или подтвердить связь в определенный момент, чтобы достичь соглашения или выполнить определенную последовательность действий.
Короче говоря, это решение проблемы, когда несколько потребителей хотят получить доступ к общим ресурсам. Как поддерживать согласованность ресурсов в определенной последовательности.
Простой пример
struct node { int val; struct node* next; } struct node *list = 0; void insert(int v) { struct *node n = malloc(sizeof(struct node)); n->val = v; n->next = list; list = n; }
Указатель списка всегда указывает на самый новый вставленный узел.
Вот такой случай.
Когда нет блокировки в обновлении указателя списка, процесс A и процесс B одновременно получают доступ к list = n. В чем проблема? Проблема в том, что мы ожидаем, что процесс A и процесс B должны иметь порядок связывания узла для списка.
Но на самом деле один из них становится сиротой из-за отсутствия блокировки в этих двух строках:
n->next = list; list = n;
Решение такое:
struct lock listlock; void insert(int v) { struct *node n = malloc(sizeof(struct node)); n->val = v; acquire(&listlock); n->next = list; list = n; release(&listlock); }
Теперь мы знаем важность блокировки. Давайте обсудим его реализацию.
Блокировка реализации
Spinlock
Определение. В программной инженерии S pinlock - это блокировка, которая заставляет поток, пытающийся получить ее, просто ждать в цикле («вращение»), многократно проверяя, замок имеется.
// Implementation 1 void acquire(struct spinlock *lk) { for(;;) { if(!lk->locked) { lk->locked = 1; break; } } }
Что в этом плохого ?
Если два процесса выполняют if (! Lk- ›locked) одновременно, реализация блокировки все равно не работает!
Решение: xchg () в сборке. Используйте аппаратный трюк, чтобы решить эту проблему.
xchg () делает следующее:
Заменить слово в памяти новым значением и вернуть старое значение
Вот реализация 2:
// Implementation 2 void acquire(struct spinlock *lk) { // if old value is 1 (locked), keep spinning while(xchg(&lk->locked, 1) != 0) ; }
Детали сборки (x86.h):
static inline uint xchg(volatile uint *addr, uint newval) { uint result; // The + in "+m" denotes a read-modify-write operand. // lock the bus make it atomic asm volatile("lock; xchgl %0 %1" : "+m" (*addr), "=a" (result) : "1" (newval) : "cc"); return result; }
Но при компиляции кода компилятор может оптимизировать, переупорядочив инструкции. То есть некоторые атомарные инструкции могут выйти из критического раздела.
// Implementation 3 void acquire(struct spinlock *lk) { // if old value is 1 (locked), keep spinning while(xchg(&lk->locked, 1) != 0) ; // Tell the C compiler and the processor to not move loads or stores // past this point, to ensure that the critical section's memory // references happen after the lock is acquired. __sync_synchronize(); }
Добавьте __sync_synchronize (), чтобы компилятор не переупорядочивал инструкции
Тупик
На картинке выше показано состояние тупика. Если левый процесс и правый процесс вызывают первые два запроса одновременно, оба процесса ждут, пока другой процесс снимет блокировку.
Одно условие, вызывающее тупик: прерывание
Если прерывание происходит после того, как процесс получил блокировку, которая будет использоваться прерыванием, это приведет к тому, что прерывание продолжит вращаться.
Способ решения - отключение прерывания до получения блокировки.
Вот мы видим пример в spinlock.c
void acquire(struct spinlock *lk) { pushcli(); // disable interrupts to avoid deadlock. if(holding(lk)) panic("acquire"); while(xchg(&lk->locked, 1) != 0) ; // Tell the C compiler and the processor to not move loads or stores // past this point, to ensure that the critical section's memory // references happen after the lock is acquired. __sync_synchronize(); // Record info about lock acquisition for debugging. lk->cpu = mycpu(); getcallerpcs(&lk, lk->pcs); }
- pushcli (): отключить прерывание по счетчику
- popcli (): включить прерывание, если ncli == 0
Режим сна и пробуждения
- Сон (канал)
›Перевести процесс вызова в спящий режим
›Освободить CPU для других работ
- Пробуждение (канал)
›Пробудите все процессы, спящие на канале
›Заставить сон вернуться
void * send(struct q *q, void *p) { // keep spinning if recv is busy while(q->ptr != 0) ; q->ptr = p; // Wake up processes for recv in Channel q // This is because there is something to handle wakeup(q); } void * recv(struct q *q) { void *p; // if there is no job, turn to sleep while(p=q->ptr == 0) sleep(q); q->ptr = 0; return p; }
Эта модель send & recv выглядит неплохо, но проблема все еще существует:
Объяснение: если recv войдет в цикл while перед выполнением сна, отправьте задачу put в q и немедленно просыпайтесь. Что случилось? Recv вызывает сон и больше не может проснуться из-за отправки продолжает вращаться, потому что q- ›ptr! = 0
Как это решить?
struct q { struct spinlock lock; void *ptr; }; void send(struct q *q, void *p) { acquire(&q->locked); while(q->ptr != 0) ; q->ptr = p; wakeup(q); release(&q->locked); } void recv(struct q *q) { acquire(&q->locked); while(q->ptr == 0) sleep(q, &q->locked); p = q->ptr; q->ptr = 0; release(&q->locked); return p; } void sleep(void *chan, struct spinlock *lk) { if(lk != &ptable.lock) { acquire(&ptable.lock); release(lk); } // Go to sleep proc->chan = chan; proc->state = SLEEPING; sched(); // context switch if(lk != &ptable.lock) { release(&ptable.lock); acquire(lk); } } // Wake up all processes sleeping on chan. // The ptable lock must be held. static void wakeup1(void *chan) { struct proc *p; for(p = ptable.proc; p < &ptable.proc[NPROC]; p++) if(p->state == SLEEPING && p->chan == chan) p->state = RUNNABLE; } // Wake up all processes sleeping on chan. void wakeup(void *chan) { acquire(&ptable.lock); wakeup1(chan); release(&ptable.lock); }
Объясните: все технологические операции защищены с помощью ptable.lock. Если у recv сначала нет работы, он перейдет в спящий режим. Разница в том, что если он перейдет в спящий режим, он получит еще один ptable.lock и выпустит (lk). Последнее, recv выполнит переключение контекста на планировщик.
Это почему ? Причина в том, что если он не освобождает lk, отправка застревает в тупике, ожидая & q- ›заблокированной.
Теперь отправка будет выполняться до пробуждения, потому что recv удерживает блокировку ptable- ›.
Затем recv в спящем режиме (ptable.lock) и ждем получения (lk).
А пока вернитесь, чтобы отправить продолжающееся пробуждение, и отпустите lk.
Последнее, recv может получить блокировку во сне