#include "threadpool.h"
#include "common.h"
threadpool_t *threadpool_init(int thread_num, int queue_max_num)
{
threadpool_t *pool = NULL;
pool = malloc(sizeof(threadpool_t));
do
{
if (NULL == pool)
{
bug("failed to malloc threadpool\n");
break;
}
pool->thread_num = thread_num;
pool->queue_max_num = queue_max_num;
pool->queue_cur_num = 0;
pool->head = NULL;
pool->tail = NULL;
if (pthread_mutex_init(&(pool->mutex), NULL))
{
bug("pthread_mutex_init\n");
break;
}
if (pthread_cond_init(&(pool->queue_empty), NULL))
{
bug("pthread_cond_init\n");
break;
}
if (pthread_cond_init(&(pool->queue_not_empty), NULL))
{
bug("pthread_cond_init\n");
break;
}
if (pthread_cond_init(&(pool->queue_not_full), NULL))
{
bug("pthread_cond_init\n");
break;
}
pool->pthreads = malloc(sizeof(pthread_t) * thread_num);
if (NULL == pool->pthreads)
{
bug("malloc error\n");
break;
}
pool->queue_close = 0;
pool->pool_close = 0;
int i;
for (i = 0; i < pool->thread_num; ++i)
{
if (pthread_create(&(pool->pthreads[i]), NULL, threadpool_function, (void *)pool) < 0)
bug("pthread_create\n");
}
return pool;
} while (0);
return NULL;
}
int threadpool_add_job(threadpool_t *pool, callback_func p_callback_func, void *arg)
{
if (pool == NULL || p_callback_func == NULL)
return -1;
pthread_mutex_lock(&(pool->mutex));
while ((pool->queue_cur_num == pool->queue_max_num) && !(pool->pool_close || pool->queue_close))
{
pthread_cond_wait(&(pool->queue_not_full), &(pool->mutex));
}
if (pool->queue_close || pool->pool_close)
{
pthread_mutex_unlock(&(pool->mutex));
return -1;
}
job_t *pjob = (job_t *)malloc(sizeof(job_t));
if (NULL == pjob)
{
pthread_mutex_unlock(&(pool->mutex));
return -1;
}
pjob->p_callback_func = p_callback_func;
pjob->arg = arg;
pjob->next = NULL;
if (pool->head == NULL)
{
pool->head = pool->tail = pjob;
pthread_cond_broadcast(&(pool->queue_not_empty));
}
else
{
pool->tail->next = pjob;
pool->tail = pjob;
}
pool->queue_cur_num++;
pthread_mutex_unlock(&(pool->mutex));
return 0;
}
void *threadpool_function(void *arg)
{
threadpool_t *pool = (threadpool_t *)arg;
job_t *pjob = NULL;
while (1)
{
pthread_mutex_lock(&(pool->mutex));
while ((pool->queue_cur_num == 0) && !pool->pool_close)
{
pthread_cond_wait(&(pool->queue_not_empty), &(pool->mutex));
}
if (pool->pool_close)
{
pthread_mutex_unlock(&(pool->mutex));
pthread_exit(NULL);
}
pool->queue_cur_num--;
pjob = pool->head;
if (pool->queue_cur_num == 0)
{
pool->head = pool->tail = NULL;
}
else
{
pool->head = pjob->next;
}
if (pool->queue_cur_num == 0)
{
pthread_cond_signal(&(pool->queue_empty));
}
else if (pool->queue_cur_num <= pool->queue_max_num - 1)
{
pthread_cond_broadcast(&(pool->queue_not_full));
}
pthread_mutex_unlock(&(pool->mutex));
(*(pjob->p_callback_func))(pjob->arg);
free(pjob);
pjob = NULL;
}
}
int threadpool_destory(threadpool_t *pool)
{
if (pool == NULL)
return 0;
pthread_mutex_lock(&(pool->mutex));
if (pool->queue_close && pool->pool_close)
{
pthread_mutex_unlock(&(pool->mutex));
return 0;
}
pool->queue_close = 1;
while (pool->queue_cur_num != 0)
{
pthread_cond_wait(&(pool->queue_empty), &(pool->mutex));
}
pool->pool_close = 1;
pthread_mutex_unlock(&(pool->mutex));
pthread_cond_broadcast(&(pool->queue_not_empty));
pthread_cond_broadcast(&(pool->queue_not_full));
int i;
for (i = 0; i < pool->thread_num; ++i)
{
pthread_join(pool->pthreads[i], NULL);
}
pthread_mutex_destroy(&(pool->mutex));
pthread_cond_destroy(&(pool->queue_empty));
pthread_cond_destroy(&(pool->queue_not_empty));
pthread_cond_destroy(&(pool->queue_not_full));
free(pool->pthreads);
job_t *pjob;
while (pool->head != NULL)
{
pjob = pool->head;
pool->head = pjob->next;
free(pjob);
}
free(pool);
return 0;
}