这是我的一个实验的报告,用互斥锁和条件变量来实现semaphore(叫信号量吗?)。
实验要求和报告都是英文写的,学的课是 realtime system and programming,这一部分是关于同步的。
实验要求;
Lab 4. Multi-Threading
In the previous lab experiments, you have learnt and practiced multitasking programming based on single-thread multiple processes model. In this lab experiment, you will explore multipe-thread single-process model. POSIX thread is used in the lab experiment.
Threads differ from the process because threads in a process share the address space. You should know how to create threads, how data are passed to thread functions, how return values are received, how to synchronize the threads to access shared resources. Read lecture notes and Chapter 4 in "Advanced Linux Programming" for discussion on threads.
An implementation using POSIX semaphores are discussed in the lectures and the code is available.
The following are the primary objectives of this lab experiment:
Write a program that compute the following sum:
sum = 12 + 22 + 32 + ... + n2
The program creates two threads: producer and consumer. Producer computes i2 or i * i, and puts it into the bounded buffer, for i = 1, 2, 3, ..., n. Consumer takes an item from the bounded buffer at a time and does the sum.
To focus on the objectives of the lab, some consideration and part of codes are given below.
Input data: queue size, value n
output data: the sum produced by the cooperated threads and direct sum for checking the result. Possibly, include some extra information ( try my sample programs, see the links below)
Global variables: bounded buffer ( implemented by a queue), mutex and conditional variables, producer_done ( 1 if it is done, 0 otherwise )
A bounded buffer is implemented to buffer the data generated by the producer. An array based implementation is provided. Read queue.h and queue.c to see how it is implemented.
Producer accepts the value n from the pthread_create, it sets the flag producer_done before exiting and does the following loop
for i = 1 to n do try to add one item ( i * i ) to the buffer set producer_done = 1 to signal the complete of its job
Consumer takes items from the buffer and sum them. It returns sum:
for ( ; ; ) if there is more data item get it and add it to sum else if producer_done break
No busy waiting is allowed in the implementation. Use mutex and condition variables for synchronization!
File lab4.c is available for you. It contains a main ( ) function, global variable declaration, and thread functions. This main( ) is for single-producer single-consumer version.
Modify lab4.c to implement the single-producer mutiple-consumer version. The thread functions do not need to change.
I have two versions of codes:
lab4-sol : my sample solution to the lab experiment
lab4-sol2 : extended solution to the lab experiment, which allows multiple consumer threads to work together to solve the problem.
You can copy them to your directory and try them:
cp ~eric/public_html/courses/rea/labs/lab4-so* .
1. Read lecture notes and chapter 4 in "Advanced Linux Programming" to understand threads
2. Study the implementaion using POSIX semaphores given in the file prod-cons.c
3. copy my sample programs lab4-sol and lab4-sol2, and test them with different input.
4. Study bounded buffer implementation : queue.h and queue.c
5. copy queue.c, queue.h and lab4.c to your work directory and complete the functions producer and consumer.
6. run your program with different input parameters and record the results
7. Discuss the results and problems you encountered, and how you solve the problems
8. Complete a laboratory report as described in the report format pages.
9. modify your program so that it allows multiple consumer threads to work together to solve the problem (try my program lab4-sol2 for example).
用semophore实现
/* file: prod-cons.c It needs queue.c queue.h function: producer-consumer problem using semaphores to calculate 1 ^2 + 2 ^2 + ... + n ^2 compile: gcc -o prod-cons prod-cons.c queue.c -lpthread run: ./prod-cons */ #include <stdio.h> #include <pthread.h> #include <semaphore.h> #include "queue.h" /* global variables*/ Queue Q; int producer_done = 0; sem_t sem_full; /* count no. of occupied slots in buffer */ sem_t sem_empty; /* count no. of empty slots in buffer */ sem_t sem_mutex; /* for mutual exclusion access to the buffer */ void * producer (void *arg) { int n; int i; n = *(int *) arg; for (i = 1; i <= n; i++) { sem_wait (&sem_empty); sem_wait (&sem_mutex); if (IsFull (Q)) { fprintf (stderr, "buff is full\n"); } Enqueue (i * i, Q); fprintf (stderr, " %d^2 = %d is put to the queue\n", i, i * i); sem_post (&sem_mutex); sem_post (&sem_full); } sem_wait (&sem_mutex); producer_done = 1; sem_post (&sem_mutex); return NULL; } void * consumer (void *arg) { int item; int sum; int sem_value; sum = 0; for (;;) { sem_wait (&sem_mutex); if (IsEmpty (Q) && producer_done) { sem_post (&sem_mutex); break; } sem_post (&sem_mutex); sem_getvalue (&sem_full, &sem_value); if (sem_value == 0) fprintf (stderr, "buffer is empty\n"); sem_wait (&sem_full); sem_wait (&sem_mutex); item = Dequeue (Q); sem_post (&sem_mutex); sem_post (&sem_empty); sum += item; fprintf (stderr, "%d is removed from the queue. Current sum = %d\n", item, sum); } return (void *) sum; } int main () { pthread_t prod_tid, cons_tid; int i; int n; int sum, sum1; int size; printf ("Enter the queue size: "); scanf ("%d", &size); Q = CreateQueue (size); printf ("This program compute 1 ^2 + 2 ^2 + ... + n ^2\n"); printf ("Enter n = "); scanf ("%d", &n); if (n <= 0) { fprintf (stderr, "please enter a positive integer n\n"); exit (1); } sum = 0; for (i = 1; i <= n; i++) sum += i * i; printf ("the sum is %d\n", sum); /* initialize semaphores */ sem_init (&sem_mutex, 0, 1); sem_init (&sem_full, 0, 0); sem_init (&sem_empty, 0, size); sum1 = 0; pthread_create (&prod_tid, NULL, producer, (void *) &n); pthread_create (&cons_tid, NULL, consumer, NULL); pthread_join (prod_tid, NULL); pthread_join (cons_tid, (void *) &sum1); printf ("the sum obtained by threads is %d\n", sum1); printf ("the sum should be %d\n", sum); return 0; }
用互斥锁和条件变量实现
/* file: lab4.c
It needs queue.c queue.h to work with the lab4
function: producer-consumer problem: calculate 1 ^2 + 2 ^2 + ... + n ^2
compile: gcc -o lab4 lab4.c queue.c -lpthread
run: ./lab4
*/
#include <stdio.h>
#include <pthread.h>
#include "queue.h"
/* global variables*/
Queue Q;
int producer_done = 0;
pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond_full = PTHREAD_COND_INITIALIZER;
pthread_cond_t cond_empty = PTHREAD_COND_INITIALIZER;
void * producer ( void * arg ){
/* put your impl:ementation for producer thread here*/
int n,i,temp;
n = * (int *) arg;
for(i=1;i<=n;i++){
pthread_mutex_lock(&mutex);
while (IsFull(Q))
pthread_cond_wait(&cond_empty,&mutex);
temp= i * i;
Enqueue(temp,Q);
if ( i == n )
producer_done = 1;
pthread_cond_signal(&cond_full);
pthread_mutex_unlock(&mutex);
}
return NULL;
}
void * consumer ( void * arg ){
/* put your implementation for consumer thread here*/
int sum = 0;
int temp;
for(;;){
pthread_mutex_lock(&mutex);
while(IsEmpty(Q) && !producer_done)
pthread_cond_wait(&cond_full,&mutex);
if(IsEmpty(Q) && producer_done){
pthread_mutex_unlock(&mutex);
pthread_cond_signal(&cond_full);
break;
}
temp=Dequeue(Q);
printf("child %d has got %d\n",pthread_self(),temp);
pthread_cond_signal(&cond_empty);
pthread_mutex_unlock(&mutex);
sum+=temp;
}
return (void * ) sum;
}
int main ( ){
int consumer_number;
printf("please input the consumer_number:");
scanf("%d",&consumer_number);
pthread_t prod_tid, cons_tid[consumer_number];
int i;
int n;
int sum=0,consumer_sum=0,sum1=0;
int size;
printf("Enter the queue size: "); scanf ("%d", & size );
Q = CreateQueue ( size );
printf("This program compute 1 ^2 + 2 ^2 + ... + n ^2\n");
printf("Enter n = "); scanf ("%d", & n);
if ( n <= 0 ){
fprintf(stderr,"please enter a positive integer n\n");
exit ( 1 );
}
sum = 0;
for ( i = 1; i <= n ; i ++ )
sum += i * i;
pthread_create ( & prod_tid, NULL, producer, (void * ) & n );
for (i=1;i<=consumer_number;i++)
pthread_create ( & cons_tid[i], NULL, consumer, NULL );
pthread_join ( prod_tid, NULL );
for (i=1;i<=consumer_number;i++){
pthread_join ( cons_tid[i], (void * ) & sum1);
printf("the %d thread obtained %d\n",i,sum1);
consumer_sum+=sum1;
}
printf("the sum obtained by threads is %d\n", consumer_sum );
printf("the sum should be %d\n", sum );
if ( sum == consumer_sum )
printf("Right! The producer-consumer threads do a great job\n");
else
printf("producer-consumer made wrong result. Correct it\n");
return 0;
}
queue.h
typedef int ElementType; #ifndef _Queue_h #define _Queue_h struct QueueRecord; typedef struct QueueRecord *Queue; int IsEmpty( Queue Q ); int IsFull( Queue Q ); Queue CreateQueue( int MaxElements ); void DestroyQueue( Queue Q ); void Enqueue( ElementType X, Queue Q ); ElementType Dequeue( Queue Q ); #endif /* _Queue_h */
#include "queue.h" #include <stdlib.h> #include <stdio.h> struct QueueRecord { int Capacity; int Front; int Rear; int Size; ElementType * Array; }; int IsEmpty( Queue Q ) { return Q->Size == 0; } int IsFull( Queue Q ) { return Q->Size == Q->Capacity; } Queue CreateQueue( int QueueSize ) { Queue Q; Q = malloc( sizeof( struct QueueRecord ) ); if( Q == NULL ){ fprintf(stderr, "Out of space!!!}\n" ); exit ( 1 ); } Q->Array = malloc( sizeof( ElementType ) * QueueSize ); if( Q->Array == NULL ){ fprintf(stderr, "Out of space!!!" ); exit ( 1 ); } Q->Capacity = QueueSize; Q->Size = 0; Q->Front =0; Q->Rear = 0; return Q; } void DestroyQueue( Queue Q ) { if( Q != NULL ) { free( Q->Array ); free( Q ); } } void Enqueue( ElementType X, Queue Q ) { Q->Size++; Q->Array[ Q->Rear ] = X; Q->Rear ++; if ( Q->Rear == Q->Capacity) Q->Rear = 0; } ElementType Dequeue( Queue Q ) { ElementType x; x = Q->Array[Q->Front]; Q->Size--; Q->Front ++; if ( Q->Front == Q->Capacity ) Q->Front = 0; return x; }
实验报告
Introduction
In laboratory experiment four, we deal with multithreading. With multithreading we can run different part of the program concurrently within the same process. Synchronization in the sample program is done with semaphores. In the modified program it is done with mutex and conditional variables.
Results of Running Sample Program
2.1 Program function and related concepts
The sample program uses semaphore to synchronize the threads in order to protect the global data. The program thread producer computes i2 or i*i, and puts it into the bounded buffer, for 1,2,3… n. Consumer takes an item from the bounded buffer at a time and does the sum.
Program Development and Running Result
3.1 Program development
First task is to use mutex and conditional variables instead of semaphores. At first the program was translated from semaphore to mutex and conditional variables without any changes. The program worked when there was only one producer and one consumer. We found that we can’t visit the Q->Size in the main function, because it is defined in the Queue.c. When the second task, to implement multiple consumers, some problems arised described in 4 Analysis.
Analysis of ResultsProblem 1: If we in the consumer function does the if-statement (which is used to exit the for loop) before the while-statement (which is used to wait for the producer to add item to the queue) then the result is one thread is stuck in the for loop, with no escape :
Consumer1: Wait for the cond_full.
Consumer2: Wait for the cond full.
Producer: Finished its Enqueue() and send a signal to the cond_full.
Consumer1: Waked up go through the if-statement to Dequeue().
Consumer2: Suspended by the mutex.
Consumer1: Set the Q->size to 0,unlock the mutex and terminate it self.
Consumer2: Waked up and go to the Dequeue(), set Q->Size to –1.
Consumer2: Infinite loop because the Q->Size is minus and not empty and sets the Q-Size = Q-Size –1.
Problem 2: If we try to exit the loop without sending a cond_full signal, the other threads will experience starvation. The possibility that the consumer will experience starvation depends on whether any consumer waiting for the conditional value while the last item was dequeued. The chance that it happends is higher when the n-value and consumer number is higher and queue size is smaller. It also depends on the current workload of the cpu.
Consumer1: Wait for the cond_full
Consumer2: Wait for the cond_full
Producer: Finished it’s Enqueue and send a signal to the cond_full.
Consumer1: Waked up and Dequeue some iteams and unlock the mutex and exit the for statement and exit the function.
Consumer2: Is still waiting for the cond_full.
Conclusion
In a multiple thread environment you have to protect your resources carefully. It is easy to make a mistake when implement mutex and condtional values and you have to test the program many times with different parameter because the problem may arise only with specific parameter at a specific situation (e.g. when the workload is high).
本文地址:http://com.8s8s.com/it/it29087.htm