--- bin/gearadmin.cc 2014-02-12 08:05:28.000000000 +0800 +++ bin/gearadmin.cc 2014-08-19 23:30:11.491562622 +0800 @@ -137,10 +137,15 @@ ("server-version", "Fetch the version number for the server.") ("server-verbose", "Fetch the verbose setting for the server.") ("create-function", boost::program_options::value(), "Create the function from the server.") - ("cancel-job", boost::program_options::value(), "Remove a given job from the server's queue") ("drop-function", boost::program_options::value(), "Drop the function from the server.") - ("show-unique-jobs", "Show unique jobs on server.") - ("show-jobs", "Show all jobs on the server.") + ("cancel-unique-job", boost::program_options::value(), "Remove a given unique job from the server's queue") + ("cancel-job", boost::program_options::value(), "Remove a given handle job from the server's queue") + ("cancel-unique-jobs", boost::program_options::value >()->multitoken()->zero_tokens(), "Remove unique jobs from the server's queue. ARG can be uniques (separated by ',' . '-' if there are functions but no unique) and functions (separated by ',') .") + ("cancel-jobs", boost::program_options::value >()->multitoken()->zero_tokens(), "Remove handle jobs from the server's queue. ARG can be handles (separated by ',' . '-' if there are functions but no handle) and functions (separated by ',') .") + ("show-unique-job", boost::program_options::value(), "Show a given unique job on the server.") + ("show-job", boost::program_options::value(), "Show a given handle job on the server.") + ("show-unique-jobs", boost::program_options::value >()->multitoken()->zero_tokens(), "Show unique jobs on the server. ARG can be uniques (separated by ',' . '-' if there are functions but no unique) and functions (separated by ',') .") + ("show-jobs", boost::program_options::value >()->multitoken()->zero_tokens(), "Show handle jobs on the server. ARG can be handles (separated by ',' . '-' if there are functions but no handle) and functions (separated by ',') .") ("getpid", "Get Process ID for the server.") ("status", "Status for the server.") ("workers", "Workers for the server.") @@ -176,16 +181,21 @@ } if (vm.count("server-version") == 0 and - vm.count("server-verbose") == 0 and - vm.count("create-function") == 0 and - vm.count("drop-function") == 0 and - vm.count("cancel-job") == 0 and - vm.count("show-unique-jobs") == 0 and - vm.count("show-jobs") == 0 and - vm.count("getpid") == 0 and - vm.count("status") == 0 and - vm.count("workers") == 0 and - vm.count("shutdown") == 0) + vm.count("server-verbose") == 0 and + vm.count("create-function") == 0 and + vm.count("drop-function") == 0 and + vm.count("cancel-unique-job") == 0 and + vm.count("cancel-job") == 0 and + vm.count("cancel-unique-jobs") == 0 and + vm.count("cancel-jobs") == 0 and + vm.count("show-unique-job") == 0 and + vm.count("show-job") == 0 and + vm.count("show-unique-jobs") == 0 and + vm.count("show-jobs") == 0 and + vm.count("getpid") == 0 and + vm.count("status") == 0 and + vm.count("workers") == 0 and + vm.count("shutdown") == 0) { std::cout << "No option execution operation given." << std::endl << std::endl; std::cout << desc << std::endl; @@ -217,6 +227,14 @@ instance.push(new util::Operation(util_literal_param("verbose\r\n"))); } + if (vm.count("cancel-unique-job")) + { + std::string execute(util_literal_param("cancel unique job ")); + execute.append(vm["cancel-unique-job"].as()); + execute.append("\r\n"); + instance.push(new util::Operation(execute.c_str(), execute.size())); + } + if (vm.count("cancel-job")) { std::string execute(util_literal_param("cancel job ")); @@ -225,14 +243,68 @@ instance.push(new util::Operation(execute.c_str(), execute.size())); } + if (vm.count("cancel-unique-jobs")) + { + std::string execute(util_literal_param("cancel unique jobs")); + std::vector jobs_argv= vm["cancel-unique-jobs"].as >(); + for (size_t i= 0; i < jobs_argv.size(); i++) + { + execute.append(' ' + jobs_argv[i]); + } + execute.append("\r\n"); + instance.push(new util::Operation(execute.c_str(), execute.size())); + } + + if (vm.count("cancel-jobs")) + { + std::string execute(util_literal_param("cancel jobs")); + std::vector jobs_argv= vm["cancel-jobs"].as >(); + for (size_t i= 0; i < jobs_argv.size(); i++) + { + execute.append(' ' + jobs_argv[i]); + } + execute.append("\r\n"); + instance.push(new util::Operation(execute.c_str(), execute.size())); + } + + if (vm.count("show-unique-job")) + { + std::string execute(util_literal_param("show unique job ")); + execute.append(vm["show-unique-job"].as()); + execute.append("\r\n"); + instance.push(new util::Operation(execute.c_str(), execute.size())); + } + + if (vm.count("show-job")) + { + std::string execute(util_literal_param("show job ")); + execute.append(vm["show-job"].as()); + execute.append("\r\n"); + instance.push(new util::Operation(execute.c_str(), execute.size())); + } + if (vm.count("show-unique-jobs")) { - instance.push(new util::Operation(util_literal_param("show unique jobs\r\n"))); + std::string execute(util_literal_param("show unique jobs")); + std::vector jobs_argv= vm["show-unique-jobs"].as >(); + for (size_t i= 0; i < jobs_argv.size(); i++) + { + execute.append(' ' + jobs_argv[i]); + } + execute.append("\r\n"); + instance.push(new util::Operation(execute.c_str(), execute.size())); } if (vm.count("show-jobs")) { - instance.push(new util::Operation(util_literal_param("show jobs\r\n"))); + std::string execute(util_literal_param("show jobs")); + std::vector jobs_argv= vm["show-jobs"].as >(); + for (size_t i= 0; i < jobs_argv.size(); i++) + { + execute.append(' ' + jobs_argv[i]); + } + execute.append("\r\n"); + instance.push(new util::Operation(execute.c_str(), execute.size())); } if (vm.count("drop-function")) --- libgearman/add.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman/add.cc 2014-08-03 01:52:11.415246532 +0800 @@ -63,6 +63,8 @@ switch (command) { case GEARMAN_COMMAND_SUBMIT_JOB_EPOCH: + case GEARMAN_COMMAND_SUBMIT_JOB_LOW_EPOCH: + case GEARMAN_COMMAND_SUBMIT_JOB_HIGH_EPOCH: case GEARMAN_COMMAND_SUBMIT_JOB_SCHED: case GEARMAN_COMMAND_SUBMIT_JOB_BG: case GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG: @@ -257,9 +259,12 @@ break; case GEARMAN_COMMAND_SUBMIT_JOB_EPOCH: + case GEARMAN_COMMAND_SUBMIT_JOB_LOW_EPOCH: + case GEARMAN_COMMAND_SUBMIT_JOB_HIGH_EPOCH: rc= libgearman::protocol::submit_epoch(task->client->universal, task->send, final_unique, + command, function, workload, when); --- libgearman/client.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman/client.cc 2014-08-12 17:55:05.858785257 +0800 @@ -213,9 +213,9 @@ */ static gearman_return_t _client_do_background(gearman_client_st* client_shell, gearman_command_t command, - gearman_string_t &function, - gearman_unique_t &unique, - gearman_string_t &workload, + const char *function_name, + const char *unique, + const void *workload_str, size_t workload_size, gearman_job_handle_t job_handle) { if (client_shell == NULL or client_shell->impl() == NULL) @@ -226,19 +226,23 @@ Client* client= client_shell->impl(); client->universal.reset_error(); - if (gearman_size(function) == 0) + if (function_name == NULL or strlen(function_name) == 0) { return gearman_error(client->universal, GEARMAN_INVALID_ARGUMENT, "function argument was empty"); } client->_do_handle[0]= 0; // Reset the job_handle we store in client + gearman_string_t function= { gearman_string_param_cstr(function_name) }; + gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0); + gearman_string_t workload= { static_cast(workload_str), workload_size }; + client->universal.options.no_new_data= true; gearman_task_st* do_task= add_task(*client, NULL, client, command, function, - unique, + local_unique, workload, time_t(0), gearman_actions_do_default()); @@ -266,6 +270,69 @@ return ret; } +/* + Real epoch do function. +*/ +static gearman_return_t _client_do_epoch(gearman_client_st* client_shell, + gearman_command_t command, + const char *function_name, + const char *unique, + const void *workload_str, size_t workload_size, + time_t when, + gearman_job_handle_t job_handle) +{ + if (client_shell == NULL or client_shell->impl() == NULL) + { + return GEARMAN_INVALID_ARGUMENT; + } + + Client* client= client_shell->impl(); + client->universal.reset_error(); + + if (function_name == NULL or strlen(function_name) == 0) + { + return gearman_error(client->universal, GEARMAN_INVALID_ARGUMENT, "function argument was empty"); + } + + client->_do_handle[0]= 0; // Reset the job_handle we store in client + + gearman_string_t function= { gearman_string_param_cstr(function_name) }; + gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0); + gearman_string_t workload= { static_cast(workload_str), workload_size }; + + client->universal.options.no_new_data= true; + gearman_task_st* do_task= add_task(*client, NULL, + client, + command, + function, + local_unique, + workload, + when, + gearman_actions_do_default()); + client->universal.options.no_new_data= false; + + if (do_task == NULL) + { + gearman_task_free(do_task); + return client->universal.error_code(); + } + assert(do_task); + do_task->impl()->type= GEARMAN_TASK_KIND_DO; + + gearman_return_t ret= gearman_client_run_block_tasks(client, do_task); + + if (job_handle) + { + strncpy(job_handle, do_task->impl()->job_handle, GEARMAN_JOB_HANDLE_SIZE); + } + strncpy(client->_do_handle, do_task->impl()->job_handle, GEARMAN_JOB_HANDLE_SIZE); + client->new_tasks= 0; + client->running_tasks= 0; + gearman_task_free(do_task); + + return ret; +} + /* * Public Definitions @@ -546,6 +613,15 @@ } } +void gearman_client_set_ssl(gearman_client_st *client_shell, bool ssl, + const char *ca_file, const char *certificate, const char *key_file) +{ + if (client_shell && client_shell->impl()) + { + gearman_universal_set_ssl(client_shell->impl()->universal, ssl, ca_file, certificate, key_file); + } +} + void *gearman_client_context(const gearman_client_st *client_shell) { if (client_shell and client_shell->impl()) @@ -599,7 +675,7 @@ { Client* client= client_shell->impl(); - if (gearman_connection_create(client->universal, host, port) == false) + if (gearman_connection_create(client->universal, host, port) == NULL) { assert(client->error_code() != GEARMAN_SUCCESS); return client->error_code(); @@ -614,7 +690,7 @@ gearman_return_t Client::add_server(const char *host, const char* service_) { - if (gearman_connection_create(universal, host, service_) == false) + if (gearman_connection_create(universal, host, service_) == NULL) { assert(error_code() != GEARMAN_SUCCESS); return error_code(); @@ -772,14 +848,10 @@ size_t workload_size, gearman_job_handle_t job_handle) { - gearman_string_t function= { gearman_string_param_cstr(function_name) }; - gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0); - gearman_string_t workload= { static_cast(workload_str), workload_size }; - return _client_do_background(client_shell, GEARMAN_COMMAND_SUBMIT_JOB_BG, - function, - local_unique, - workload, + function_name, + unique, + workload_str, workload_size, job_handle); } @@ -790,14 +862,10 @@ size_t workload_size, gearman_job_handle_t job_handle) { - gearman_string_t function= { gearman_string_param_cstr(function_name) }; - gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0); - gearman_string_t workload= { static_cast(workload_str), workload_size }; - return _client_do_background(client_shell, GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG, - function, - local_unique, - workload, + function_name, + unique, + workload_str, workload_size, job_handle); } @@ -808,17 +876,61 @@ size_t workload_size, gearman_job_handle_t job_handle) { - gearman_string_t function= { gearman_string_param_cstr(function_name) }; - gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0); - gearman_string_t workload= { static_cast(workload_str), workload_size }; - return _client_do_background(client_shell, GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG, - function, - local_unique, - workload, + function_name, + unique, + workload_str, workload_size, job_handle); } +gearman_return_t gearman_client_do_epoch(gearman_client_st *client_shell, + const char *function_name, + const char *unique, + const void *workload_str, + size_t workload_size, + time_t when, + gearman_job_handle_t job_handle) +{ + return _client_do_epoch(client_shell, GEARMAN_COMMAND_SUBMIT_JOB_EPOCH, + function_name, + unique, + workload_str, workload_size, + when, + job_handle); +} + +gearman_return_t gearman_client_do_high_epoch(gearman_client_st *client_shell, + const char *function_name, + const char *unique, + const void *workload_str, + size_t workload_size, + time_t when, + gearman_job_handle_t job_handle) +{ + return _client_do_epoch(client_shell, GEARMAN_COMMAND_SUBMIT_JOB_HIGH_EPOCH, + function_name, + unique, + workload_str, workload_size, + when, + job_handle); +} + +gearman_return_t gearman_client_do_low_epoch(gearman_client_st *client_shell, + const char *function_name, + const char *unique, + const void *workload_str, + size_t workload_size, + time_t when, + gearman_job_handle_t job_handle) +{ + return _client_do_epoch(client_shell, GEARMAN_COMMAND_SUBMIT_JOB_LOW_EPOCH, + function_name, + unique, + workload_str, workload_size, + when, + job_handle); +} + gearman_status_t gearman_client_unique_status(gearman_client_st *client_shell, const char *unique, size_t unique_length) { @@ -926,27 +1038,27 @@ if (gearman_success(ret)) { - if (is_known) + if (is_known != NULL) { *is_known= do_task->impl()->options.is_known; } - if (is_running) + if (is_running != NULL) { *is_running= do_task->impl()->options.is_running; } - if (numerator) + if (numerator != NULL) { *numerator= do_task->impl()->numerator; } - if (denominator) + if (denominator != NULL) { *denominator= do_task->impl()->denominator; } - if (is_known == false and is_running == false) + if (is_known == NULL and is_running == NULL) { if (do_task->impl()->options.is_running) { @@ -960,22 +1072,22 @@ } else { - if (is_known) + if (is_known != NULL) { *is_known= false; } - if (is_running) + if (is_running != NULL) { *is_running= false; } - if (numerator) + if (numerator != NULL) { *numerator= 0; } - if (denominator) + if (denominator != NULL) { *denominator= 0; } @@ -1217,6 +1329,96 @@ } +gearman_task_st *gearman_client_add_task_epoch(gearman_client_st *client, + gearman_task_st *task, + void *context, + const char *function, + const char *unique, + const void *workload, size_t workload_size, + time_t when, + gearman_return_t *ret_ptr) +{ + gearman_return_t unused; + if (ret_ptr == NULL) + { + ret_ptr= &unused; + } + + if (client == NULL or client->impl() == NULL) + { + *ret_ptr= GEARMAN_INVALID_ARGUMENT; + return NULL; + } + + return add_task_ptr(*(client->impl()), task, context, GEARMAN_COMMAND_SUBMIT_JOB_EPOCH, + function, + unique, + workload, workload_size, + when, + *ret_ptr, + client->impl()->actions); +} + +gearman_task_st *gearman_client_add_task_high_epoch(gearman_client_st *client, + gearman_task_st *task, + void *context, + const char *function, + const char *unique, + const void *workload, size_t workload_size, + time_t when, + gearman_return_t *ret_ptr) +{ + gearman_return_t unused; + if (ret_ptr == NULL) + { + ret_ptr= &unused; + } + + if (client == NULL or client->impl() == NULL) + { + *ret_ptr= GEARMAN_INVALID_ARGUMENT; + return NULL; + } + + return add_task_ptr(*(client->impl()), task, context, GEARMAN_COMMAND_SUBMIT_JOB_HIGH_EPOCH, + function, + unique, + workload, workload_size, + when, + *ret_ptr, + client->impl()->actions); +} + +gearman_task_st *gearman_client_add_task_low_epoch(gearman_client_st *client, + gearman_task_st *task, + void *context, + const char *function, + const char *unique, + const void *workload, size_t workload_size, + time_t when, + gearman_return_t *ret_ptr) +{ + gearman_return_t unused; + if (ret_ptr == NULL) + { + ret_ptr= &unused; + } + + if (client == NULL or client->impl() == NULL) + { + *ret_ptr= GEARMAN_INVALID_ARGUMENT; + return NULL; + } + + return add_task_ptr(*(client->impl()), task, context, GEARMAN_COMMAND_SUBMIT_JOB_LOW_EPOCH, + function, + unique, + workload, workload_size, + when, + *ret_ptr, + client->impl()->actions); +} + gearman_task_st *gearman_client_add_task_status(gearman_client_st *client_shell, gearman_task_st *task_shell, void *context, --- libgearman/client.hpp 2014-02-12 08:05:28.000000000 +0800 +++ libgearman/client.hpp 2014-07-07 14:23:34.249335984 +0800 @@ -121,6 +121,7 @@ void enable_ssl() { + return; if (getenv("GEARMAND_CA_CERTIFICATE")) { gearman_client_add_options(_client, GEARMAN_CLIENT_SSL); --- libgearman/command.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman/command.cc 2014-08-06 16:52:24.835443718 +0800 @@ -93,12 +93,14 @@ { "GEARMAN_GRAB_JOB_ALL", GEARMAN_COMMAND_GRAB_JOB_ALL, 0, false }, { "GEARMAN_JOB_ASSIGN_ALL", GEARMAN_COMMAND_JOB_ASSIGN_ALL, 4, true }, { "GEARMAN_GET_STATUS_UNIQUE", GEARMAN_COMMAND_GET_STATUS_UNIQUE, 1, false }, - { "GEARMAN_STATUS_RES_UNIQUE", GEARMAN_COMMAND_STATUS_RES_UNIQUE, 6, false } + { "GEARMAN_STATUS_RES_UNIQUE", GEARMAN_COMMAND_STATUS_RES_UNIQUE, 6, false }, + { "GEARMAN_SUBMIT_JOB_HIGH_EPOCH", GEARMAN_COMMAND_SUBMIT_JOB_HIGH_EPOCH, 3, true }, + { "GEARMAN_SUBMIT_JOB_LOW_EPOCH", GEARMAN_COMMAND_SUBMIT_JOB_LOW_EPOCH, 3, true } }; const char *gearman_strcommand(gearman_command_t command) { - if ((command >= GEARMAN_COMMAND_TEXT) and (command <= GEARMAN_COMMAND_STATUS_RES_UNIQUE)) + if ((command >= GEARMAN_COMMAND_TEXT) and (command < GEARMAN_COMMAND_MAX)) { const char* str= gearmand_command_info_list[command].name; @@ -112,7 +114,7 @@ const char *gearman_enum_strcommand(gearman_command_t command) { - if ((command >= GEARMAN_COMMAND_TEXT) and (command <= GEARMAN_COMMAND_STATUS_RES_UNIQUE)) + if ((command >= GEARMAN_COMMAND_TEXT) and (command < GEARMAN_COMMAND_MAX)) { return gearmand_command_info_list[command].name; } --- libgearman/command.gperf 2014-02-12 08:05:28.000000000 +0800 +++ libgearman/command.gperf 2014-08-02 20:00:02.315307073 +0800 @@ -106,4 +106,6 @@ JOB_ASSIGN_ALL, GEARMAN_COMMAND_JOB_ASSIGN_ALL GET_STATUS_UNIQUE, GEARMAN_COMMAND_GET_STATUS_UNIQUE STATUS_RES_UNIQUE, GEARMAN_COMMAND_STATUS_RES_UNIQUE +SUBMIT_JOB_HIGH_EPOCH, GEARMAN_COMMAND_SUBMIT_JOB_HIGH_EPOCH +SUBMIT_JOB_LOW_EPOCH, GEARMAN_COMMAND_SUBMIT_JOB_LOW_EPOCH %% --- libgearman/command.hpp 2014-02-12 08:06:02.000000000 +0800 +++ libgearman/command.hpp 2014-08-02 20:06:36.115304168 +0800 @@ -85,12 +85,12 @@ }; #include -#define TOTAL_KEYWORDS 43 +#define TOTAL_KEYWORDS 45 #define MIN_WORD_LENGTH 4 #define MAX_WORD_LENGTH 28 #define MIN_HASH_VALUE 4 -#define MAX_HASH_VALUE 73 -/* maximum key range = 70, duplicates = 0 */ +#define MAX_HASH_VALUE 75 +/* maximum key range = 72, duplicates = 0 */ #ifndef GPERF_DOWNCASE #define GPERF_DOWNCASE 1 @@ -150,32 +150,32 @@ { static const unsigned char asso_values[] = { - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 35, 5, 25, 10, 25, - 74, 0, 25, 74, 5, 74, 10, 74, 0, 15, - 30, 40, 15, 10, 0, 0, 74, 0, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 35, 5, 25, - 10, 25, 74, 0, 25, 74, 5, 74, 10, 74, - 0, 15, 30, 40, 15, 10, 0, 0, 74, 0, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74 + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 25, 5, 25, 25, 25, + 76, 0, 10, 76, 10, 76, 5, 76, 45, 30, + 0, 20, 45, 10, 0, 0, 76, 0, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 25, 5, 25, + 25, 25, 76, 0, 10, 76, 10, 76, 5, 76, + 45, 30, 0, 20, 45, 10, 0, 0, 76, 0, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76 }; return len + asso_values[(unsigned char)str[len - 1]] + asso_values[(unsigned char)str[0]]; } @@ -184,34 +184,26 @@ { #line 66 "libgearman/command.gperf" {"TEXT", GEARMAN_COMMAND_TEXT}, -#line 76 "libgearman/command.gperf" - {"NO_JOB", GEARMAN_COMMAND_NO_JOB}, +#line 70 "libgearman/command.gperf" + {"PRE_SLEEP", GEARMAN_COMMAND_PRE_SLEEP}, #line 95 "libgearman/command.gperf" {"WORK_WARNING", GEARMAN_COMMAND_WORK_WARNING }, #line 75 "libgearman/command.gperf" {"GRAB_JOB", GEARMAN_COMMAND_GRAB_JOB}, -#line 91 "libgearman/command.gperf" - {"WORK_EXCEPTION", GEARMAN_COMMAND_WORK_EXCEPTION }, -#line 77 "libgearman/command.gperf" - {"JOB_ASSIGN", GEARMAN_COMMAND_JOB_ASSIGN }, -#line 71 "libgearman/command.gperf" - {"UNUSED", GEARMAN_COMMAND_UNUSED}, #line 80 "libgearman/command.gperf" {"WORK_FAIL",GEARMAN_COMMAND_WORK_FAIL}, +#line 105 "libgearman/command.gperf" + {"GRAB_JOB_ALL", GEARMAN_COMMAND_GRAB_JOB_ALL }, #line 81 "libgearman/command.gperf" {"GET_STATUS",GEARMAN_COMMAND_GET_STATUS}, #line 78 "libgearman/command.gperf" {"WORK_STATUS", GEARMAN_COMMAND_WORK_STATUS}, -#line 105 "libgearman/command.gperf" - {"GRAB_JOB_ALL", GEARMAN_COMMAND_GRAB_JOB_ALL }, #line 84 "libgearman/command.gperf" {"SUBMIT_JOB_BG", GEARMAN_COMMAND_SUBMIT_JOB_BG }, #line 99 "libgearman/command.gperf" {"SUBMIT_JOB_LOW", GEARMAN_COMMAND_SUBMIT_JOB_LOW }, #line 73 "libgearman/command.gperf" {"SUBMIT_JOB", GEARMAN_COMMAND_SUBMIT_JOB }, -#line 74 "libgearman/command.gperf" - {"JOB_CREATED", GEARMAN_COMMAND_JOB_CREATED}, #line 100 "libgearman/command.gperf" {"SUBMIT_JOB_LOW_BG", GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG }, #line 98 "libgearman/command.gperf" @@ -220,54 +212,66 @@ {"JOB_ASSIGN_ALL", GEARMAN_COMMAND_JOB_ASSIGN_ALL }, #line 86 "libgearman/command.gperf" {"STATUS_RES", GEARMAN_COMMAND_STATUS_RES}, +#line 71 "libgearman/command.gperf" + {"UNUSED", GEARMAN_COMMAND_UNUSED}, #line 103 "libgearman/command.gperf" {"SUBMIT_REDUCE_JOB", GEARMAN_COMMAND_SUBMIT_REDUCE_JOB}, -#line 88 "libgearman/command.gperf" - {"SET_CLIENT_ID", GEARMAN_COMMAND_SET_CLIENT_ID}, -#line 72 "libgearman/command.gperf" - {"NOOP", GEARMAN_COMMAND_NOOP}, -#line 93 "libgearman/command.gperf" - {"OPTION_RES", GEARMAN_COMMAND_OPTION_RES}, -#line 101 "libgearman/command.gperf" - {"SUBMIT_JOB_SCHED", GEARMAN_COMMAND_SUBMIT_JOB_SCHED }, +#line 96 "libgearman/command.gperf" + {"GRAB_JOB_UNIQ", GEARMAN_COMMAND_GRAB_JOB_UNIQ}, +#line 94 "libgearman/command.gperf" + {"WORK_DATA", GEARMAN_COMMAND_WORK_DATA }, +#line 87 "libgearman/command.gperf" + {"SUBMIT_JOB_HIGH", GEARMAN_COMMAND_SUBMIT_JOB_HIGH }, +#line 102 "libgearman/command.gperf" + {"SUBMIT_JOB_EPOCH", GEARMAN_COMMAND_SUBMIT_JOB_EPOCH }, #line 79 "libgearman/command.gperf" {"WORK_COMPLETE", GEARMAN_COMMAND_WORK_COMPLETE }, #line 89 "libgearman/command.gperf" {"CAN_DO_TIMEOUT", GEARMAN_COMMAND_CAN_DO_TIMEOUT}, -#line 69 "libgearman/command.gperf" - {"RESET_ABILITIES", GEARMAN_COMMAND_RESET_ABILITIES}, +#line 110 "libgearman/command.gperf" + {"SUBMIT_JOB_LOW_EPOCH", GEARMAN_COMMAND_SUBMIT_JOB_LOW_EPOCH }, +#line 109 "libgearman/command.gperf" + {"SUBMIT_JOB_HIGH_EPOCH", GEARMAN_COMMAND_SUBMIT_JOB_HIGH_EPOCH }, #line 107 "libgearman/command.gperf" {"GET_STATUS_UNIQUE", GEARMAN_COMMAND_GET_STATUS_UNIQUE}, #line 83 "libgearman/command.gperf" {"ECHO_RES", GEARMAN_COMMAND_ECHO_RES }, -#line 94 "libgearman/command.gperf" - {"WORK_DATA", GEARMAN_COMMAND_WORK_DATA }, -#line 85 "libgearman/command.gperf" - {"ERROR", GEARMAN_COMMAND_ERROR}, -#line 67 "libgearman/command.gperf" - {"CAN_DO", GEARMAN_COMMAND_CAN_DO}, -#line 68 "libgearman/command.gperf" - {"CANT_DO", GEARMAN_COMMAND_CANT_DO}, -#line 104 "libgearman/command.gperf" - {"SUBMIT_REDUCE_JOB_BACKGROUND", GEARMAN_COMMAND_SUBMIT_REDUCE_JOB_BACKGROUND}, -#line 87 "libgearman/command.gperf" - {"SUBMIT_JOB_HIGH", GEARMAN_COMMAND_SUBMIT_JOB_HIGH }, -#line 102 "libgearman/command.gperf" - {"SUBMIT_JOB_EPOCH", GEARMAN_COMMAND_SUBMIT_JOB_EPOCH }, -#line 108 "libgearman/command.gperf" - {"STATUS_RES_UNIQUE", GEARMAN_COMMAND_STATUS_RES_UNIQUE}, -#line 96 "libgearman/command.gperf" - {"GRAB_JOB_UNIQ", GEARMAN_COMMAND_GRAB_JOB_UNIQ}, #line 90 "libgearman/command.gperf" {"ALL_YOURS", GEARMAN_COMMAND_ALL_YOURS}, #line 97 "libgearman/command.gperf" {"JOB_ASSIGN_UNIQ", GEARMAN_COMMAND_JOB_ASSIGN_UNIQ }, +#line 74 "libgearman/command.gperf" + {"JOB_CREATED", GEARMAN_COMMAND_JOB_CREATED}, +#line 88 "libgearman/command.gperf" + {"SET_CLIENT_ID", GEARMAN_COMMAND_SET_CLIENT_ID}, +#line 72 "libgearman/command.gperf" + {"NOOP", GEARMAN_COMMAND_NOOP}, +#line 93 "libgearman/command.gperf" + {"OPTION_RES", GEARMAN_COMMAND_OPTION_RES}, +#line 101 "libgearman/command.gperf" + {"SUBMIT_JOB_SCHED", GEARMAN_COMMAND_SUBMIT_JOB_SCHED }, +#line 108 "libgearman/command.gperf" + {"STATUS_RES_UNIQUE", GEARMAN_COMMAND_STATUS_RES_UNIQUE}, +#line 82 "libgearman/command.gperf" + {"ECHO_REQ", GEARMAN_COMMAND_ECHO_REQ }, +#line 76 "libgearman/command.gperf" + {"NO_JOB", GEARMAN_COMMAND_NO_JOB}, +#line 91 "libgearman/command.gperf" + {"WORK_EXCEPTION", GEARMAN_COMMAND_WORK_EXCEPTION }, #line 92 "libgearman/command.gperf" {"OPTION_REQ", GEARMAN_COMMAND_OPTION_REQ}, -#line 70 "libgearman/command.gperf" - {"PRE_SLEEP", GEARMAN_COMMAND_PRE_SLEEP}, -#line 82 "libgearman/command.gperf" - {"ECHO_REQ", GEARMAN_COMMAND_ECHO_REQ } +#line 67 "libgearman/command.gperf" + {"CAN_DO", GEARMAN_COMMAND_CAN_DO}, +#line 68 "libgearman/command.gperf" + {"CANT_DO", GEARMAN_COMMAND_CANT_DO}, +#line 104 "libgearman/command.gperf" + {"SUBMIT_REDUCE_JOB_BACKGROUND", GEARMAN_COMMAND_SUBMIT_REDUCE_JOB_BACKGROUND}, +#line 77 "libgearman/command.gperf" + {"JOB_ASSIGN", GEARMAN_COMMAND_JOB_ASSIGN }, +#line 69 "libgearman/command.gperf" + {"RESET_ABILITIES", GEARMAN_COMMAND_RESET_ABILITIES}, +#line 85 "libgearman/command.gperf" + {"ERROR", GEARMAN_COMMAND_ERROR} }; const struct gearman_command_string_st * @@ -290,8 +294,8 @@ goto compare; } break; - case 7: - if (len == 6) + case 5: + if (len == 9) { resword = &gearman_command_string_st[1]; goto compare; @@ -312,278 +316,292 @@ } break; case 10: - if (len == 14) + if (len == 9) { resword = &gearman_command_string_st[4]; goto compare; } break; - case 11: - if (len == 10) + case 13: + if (len == 12) { resword = &gearman_command_string_st[5]; goto compare; } break; - case 12: - if (len == 6) + case 16: + if (len == 10) { resword = &gearman_command_string_st[6]; goto compare; } break; - case 15: - if (len == 9) + case 17: + if (len == 11) { resword = &gearman_command_string_st[7]; goto compare; } break; - case 16: - if (len == 10) + case 19: + if (len == 13) { resword = &gearman_command_string_st[8]; goto compare; } break; - case 17: - if (len == 11) + case 20: + if (len == 14) { resword = &gearman_command_string_st[9]; goto compare; } break; - case 18: - if (len == 12) + case 21: + if (len == 10) { resword = &gearman_command_string_st[10]; goto compare; } break; - case 19: - if (len == 13) + case 23: + if (len == 17) { resword = &gearman_command_string_st[11]; goto compare; } break; - case 20: - if (len == 14) + case 24: + if (len == 18) { resword = &gearman_command_string_st[12]; goto compare; } break; - case 21: - if (len == 10) + case 25: + if (len == 14) { resword = &gearman_command_string_st[13]; goto compare; } break; - case 22: - if (len == 11) + case 26: + if (len == 10) { resword = &gearman_command_string_st[14]; goto compare; } break; - case 23: - if (len == 17) + case 27: + if (len == 6) { resword = &gearman_command_string_st[15]; goto compare; } break; - case 24: - if (len == 18) + case 28: + if (len == 17) { resword = &gearman_command_string_st[16]; goto compare; } break; - case 25: - if (len == 14) + case 29: + if (len == 13) { resword = &gearman_command_string_st[17]; goto compare; } break; - case 26: - if (len == 10) + case 30: + if (len == 9) { resword = &gearman_command_string_st[18]; goto compare; } break; - case 28: - if (len == 17) + case 31: + if (len == 15) { resword = &gearman_command_string_st[19]; goto compare; } break; - case 29: - if (len == 13) + case 32: + if (len == 16) { resword = &gearman_command_string_st[20]; goto compare; } break; - case 30: - if (len == 4) + case 34: + if (len == 13) { resword = &gearman_command_string_st[21]; goto compare; } break; - case 31: - if (len == 10) + case 35: + if (len == 14) { resword = &gearman_command_string_st[22]; goto compare; } break; - case 32: - if (len == 16) + case 36: + if (len == 20) { resword = &gearman_command_string_st[23]; goto compare; } break; - case 34: - if (len == 13) + case 37: + if (len == 21) { resword = &gearman_command_string_st[24]; goto compare; } break; - case 35: - if (len == 14) + case 38: + if (len == 17) { resword = &gearman_command_string_st[25]; goto compare; } break; - case 36: - if (len == 15) + case 39: + if (len == 8) { resword = &gearman_command_string_st[26]; goto compare; } break; - case 38: - if (len == 17) + case 40: + if (len == 9) { resword = &gearman_command_string_st[27]; goto compare; } break; - case 39: - if (len == 8) + case 41: + if (len == 15) { resword = &gearman_command_string_st[28]; goto compare; } break; - case 40: - if (len == 9) + case 42: + if (len == 11) { resword = &gearman_command_string_st[29]; goto compare; } break; - case 41: - if (len == 5) + case 44: + if (len == 13) { resword = &gearman_command_string_st[30]; goto compare; } break; - case 42: - if (len == 6) + case 45: + if (len == 4) { resword = &gearman_command_string_st[31]; goto compare; } break; - case 43: - if (len == 7) + case 46: + if (len == 10) { resword = &gearman_command_string_st[32]; goto compare; } break; - case 44: - if (len == 28) + case 47: + if (len == 16) { resword = &gearman_command_string_st[33]; goto compare; } break; - case 46: - if (len == 15) + case 48: + if (len == 17) { resword = &gearman_command_string_st[34]; goto compare; } break; - case 47: - if (len == 16) + case 49: + if (len == 8) { resword = &gearman_command_string_st[35]; goto compare; } break; - case 48: - if (len == 17) + case 52: + if (len == 6) { resword = &gearman_command_string_st[36]; goto compare; } break; - case 49: - if (len == 13) + case 55: + if (len == 14) { resword = &gearman_command_string_st[37]; goto compare; } break; - case 50: - if (len == 9) + case 56: + if (len == 10) { resword = &gearman_command_string_st[38]; goto compare; } break; - case 56: - if (len == 15) + case 57: + if (len == 6) { resword = &gearman_command_string_st[39]; goto compare; } break; - case 61: - if (len == 10) + case 58: + if (len == 7) { resword = &gearman_command_string_st[40]; goto compare; } break; - case 65: - if (len == 9) + case 59: + if (len == 28) { resword = &gearman_command_string_st[41]; goto compare; } break; - case 69: - if (len == 8) + case 61: + if (len == 10) { resword = &gearman_command_string_st[42]; goto compare; } break; + case 66: + if (len == 15) + { + resword = &gearman_command_string_st[43]; + goto compare; + } + break; + case 71: + if (len == 5) + { + resword = &gearman_command_string_st[44]; + goto compare; + } + break; } return 0; compare: @@ -597,5 +615,5 @@ } return 0; } -#line 109 "libgearman/command.gperf" +#line 111 "libgearman/command.gperf" --- libgearman/connection.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman/connection.cc 2014-08-03 23:19:28.255335115 +0800 @@ -678,6 +678,8 @@ ERR_error_string_n(SSL_get_error(_ssl, 0), errorString, sizeof(errorString)); return gearman_error(universal, GEARMAN_COULD_NOT_CONNECT, errorString); } + + SSL_set_connect_state(_ssl); } #endif @@ -858,7 +860,11 @@ case SSL_ERROR_SSL: default: { - char errorString[80]; + if (ERR_peek_last_error()) + { + ssl_error= ERR_peek_last_error(); + } + char errorString[SSL_ERROR_SIZE]; ERR_error_string_n(ssl_error, errorString, sizeof(errorString)); close_socket(); return gearman_universal_set_error(universal, GEARMAN_LOST_CONNECTION, GEARMAN_AT, "SSL failure(%s)", errorString); @@ -1163,7 +1169,11 @@ case SSL_ERROR_SSL: default: { - char errorString[80]; + if (ERR_peek_last_error()) + { + ssl_error= ERR_peek_last_error(); + } + char errorString[SSL_ERROR_SIZE]; ERR_error_string_n(ssl_error, errorString, sizeof(errorString)); close_socket(); return gearman_universal_set_error(universal, GEARMAN_LOST_CONNECTION, GEARMAN_AT, "SSL failure(%s)", errorString); --- libgearman/execute.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman/execute.cc 2014-08-02 20:27:11.055295057 +0800 @@ -68,6 +68,20 @@ return GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG; } +static inline gearman_command_t pick_command_by_priority_epoch(const gearman_job_priority_t &arg) +{ + if (arg == GEARMAN_JOB_PRIORITY_NORMAL) + { + return GEARMAN_COMMAND_SUBMIT_JOB_EPOCH; + } + else if (arg == GEARMAN_JOB_PRIORITY_HIGH) + { + return GEARMAN_COMMAND_SUBMIT_JOB_HIGH_EPOCH; + } + + return GEARMAN_COMMAND_SUBMIT_JOB_LOW_EPOCH; +} + gearman_task_st *gearman_execute(gearman_client_st *client_shell, @@ -117,7 +131,7 @@ case GEARMAN_TASK_ATTR_EPOCH: task= add_task(*client, context, - GEARMAN_COMMAND_SUBMIT_JOB_EPOCH, + pick_command_by_priority_epoch(task_attr->priority), function, unique, arguments->value, --- libgearman/interface/universal.hpp 2014-02-12 08:05:28.000000000 +0800 +++ libgearman/interface/universal.hpp 2014-08-21 09:48:05.371966783 +0800 @@ -38,6 +38,7 @@ #pragma once +#include #include "libgearman/allocator.hpp" #include "libgearman/server_options.hpp" #include "libgearman/interface/packet.hpp" @@ -68,12 +69,18 @@ bool non_blocking; bool no_new_data; bool _ssl; + struct gearman_vector_st *_ssl_ca_file; + struct gearman_vector_st *_ssl_certificate; + struct gearman_vector_st *_ssl_key; Options() : dont_track_packets(false), non_blocking(false), no_new_data(false), - _ssl(false) + _ssl(false), + _ssl_ca_file(NULL), + _ssl_certificate(NULL), + _ssl_key(NULL) { } } options; gearman_verbose_t verbose; @@ -208,6 +215,11 @@ const char* ssl_ca_file() const { + if (options._ssl_ca_file && options._ssl_ca_file->size()) + { + return options._ssl_ca_file->c_str(); + } + if (getenv("GEARMAND_CA_CERTIFICATE")) { return getenv("GEARMAND_CA_CERTIFICATE"); @@ -216,8 +228,27 @@ return GEARMAND_CA_CERTIFICATE; } + void ssl_ca_file(const char* ssl_ca_file_) + { + gearman_string_free(options._ssl_ca_file); + size_t ssl_ca_file_size_= 0; + if (ssl_ca_file_ && (ssl_ca_file_size_= strlen(ssl_ca_file_))) + { + options._ssl_ca_file= gearman_string_create(NULL, ssl_ca_file_, ssl_ca_file_size_); + } + else + { + options._ssl_ca_file= NULL; + } + } + const char* ssl_certificate() const { + if (options._ssl_certificate && options._ssl_certificate->size()) + { + return options._ssl_certificate->c_str(); + } + if (getenv("GEARMAN_CLIENT_PEM")) { return getenv("GEARMAN_CLIENT_PEM"); @@ -226,8 +257,27 @@ return GEARMAN_CLIENT_PEM; } + void ssl_certificate(const char *ssl_certificate_) + { + gearman_string_free(options._ssl_certificate); + size_t ssl_certificate_size_= 0; + if (ssl_certificate_ && (ssl_certificate_size_= strlen(ssl_certificate_))) + { + options._ssl_certificate= gearman_string_create(NULL, ssl_certificate_, ssl_certificate_size_); + } + else + { + options._ssl_certificate= NULL; + } + } + const char* ssl_key() const { + if (options._ssl_key && options._ssl_key->size()) + { + return options._ssl_key->c_str(); + } + if (getenv("GEARMAN_CLIENT_KEY")) { return getenv("GEARMAN_CLIENT_KEY"); @@ -236,6 +286,20 @@ return GEARMAN_CLIENT_KEY; } + void ssl_key(const char *ssl_key_) + { + gearman_string_free(options._ssl_key); + size_t ssl_key_size_= 0; + if (ssl_key_ && (ssl_key_size_= strlen(ssl_key_))) + { + options._ssl_key= gearman_string_create(NULL, ssl_key_, ssl_key_size_); + } + else + { + options._ssl_key= NULL; + } + } + private: bool init_ssl(); --- libgearman/protocol/submit.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman/protocol/submit.cc 2014-08-02 19:43:57.085314194 +0800 @@ -113,6 +113,7 @@ gearman_return_t submit_epoch(gearman_universal_st& universal, gearman_packet_st& message, const gearman_unique_t& unique, + const gearman_command_t command, const gearman_string_t &function, const gearman_string_t &workload, time_t when) @@ -155,7 +156,7 @@ return gearman_packet_create_args(universal, message, GEARMAN_MAGIC_REQUEST, - GEARMAN_COMMAND_SUBMIT_JOB_EPOCH, + command, args, args_size, 4); } --- libgearman/protocol/submit.h 2014-02-12 08:05:28.000000000 +0800 +++ libgearman/protocol/submit.h 2014-08-03 03:32:16.555202229 +0800 @@ -58,6 +58,7 @@ gearman_return_t submit_epoch(gearman_universal_st&, gearman_packet_st& message, const gearman_unique_t& unique, + const gearman_command_t command, const gearman_string_t &function, const gearman_string_t &workload, time_t when); --- libgearman/run.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman/run.cc 2014-08-02 20:13:22.835301167 +0800 @@ -199,6 +199,8 @@ task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG || task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG || task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_EPOCH || + task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_EPOCH || + task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_LOW_EPOCH || task->send.command == GEARMAN_COMMAND_SUBMIT_REDUCE_JOB_BACKGROUND) { task->error_code(GEARMAN_SUCCESS); --- libgearman/universal.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman/universal.cc 2014-08-03 23:20:50.125334511 +0800 @@ -183,6 +183,15 @@ self.timeout= timeout; } +void gearman_universal_set_ssl(gearman_universal_st &self, bool ssl, + const char *ca_file, const char *certificate, const char *key_file) +{ + self.ssl(ssl); + self.ssl_ca_file(ca_file); + self.ssl_certificate(certificate); + self.ssl_key(key_file); +} + void gearman_set_log_fn(gearman_universal_st &self, gearman_log_fn *function, void *context, gearman_verbose_t verbose) { @@ -470,10 +479,31 @@ if (ssl()) { #if defined(HAVE_SSL) && HAVE_SSL + // Check these files exist or not to avoid coredump. + FILE *file= NULL; + if ((file= fopen(ssl_ca_file(), "r")) == NULL) + { + gearman_universal_set_error(*this, GEARMAN_INVALID_ARGUMENT, GEARMAN_AT, "Failed to open CA certificate %s (%d: %s)", ssl_ca_file(), errno, strerror(errno)); + return false; + } + fclose(file); + if ((file= fopen(ssl_certificate(), "r")) == NULL) + { + gearman_universal_set_error(*this, GEARMAN_INVALID_ARGUMENT, GEARMAN_AT, "Failed to open certificate %s (%d: %s)", ssl_certificate(), errno, strerror(errno)); + return false; + } + fclose(file); + if ((file= fopen(ssl_key(), "r")) == NULL) + { + gearman_universal_set_error(*this, GEARMAN_INVALID_ARGUMENT, GEARMAN_AT, "Failed to open certificate key %s (%d: %s)", ssl_key(), errno, strerror(errno)); + return false; + } + fclose(file); + SSL_load_error_strings(); SSL_library_init(); - if ((_ctx_ssl= SSL_CTX_new(TLSv1_client_method())) == NULL) + if ((_ctx_ssl= SSL_CTX_new(SSLv23_client_method())) == NULL) { gearman_universal_set_error(*this, GEARMAN_INVALID_ARGUMENT, GEARMAN_AT, "CyaTLSv1_client_method() failed"); return false; @@ -496,6 +526,12 @@ gearman_universal_set_error(*this, GEARMAN_INVALID_ARGUMENT, GEARMAN_AT, "Failed to load certificate key %s", ssl_key()); return false; } + + if (SSL_CTX_check_private_key(_ctx_ssl) != SSL_SUCCESS) + { + gearman_universal_set_error(*this, GEARMAN_INVALID_ARGUMENT, GEARMAN_AT, "Failed to check private key"); + return false; + } #endif // defined(HAVE_SSL) && HAVE_SSL } --- libgearman/universal.hpp 2014-02-12 08:05:28.000000000 +0800 +++ libgearman/universal.hpp 2014-07-14 15:41:04.485239304 +0800 @@ -57,6 +57,9 @@ int gearman_universal_timeout(gearman_universal_st &self); +void gearman_universal_set_ssl(gearman_universal_st &self, bool ssl, + const char *ca_file, const char *certificate, const char *key_file); + void gearman_universal_set_namespace(gearman_universal_st &self, const char *namespace_key, size_t namespace_key_size); gearman_return_t cancel_job(gearman_universal_st& universal, --- libgearman/worker.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman/worker.cc 2014-07-15 00:43:21.945244238 +0800 @@ -415,6 +415,15 @@ } } +void gearman_worker_set_ssl(gearman_worker_st *worker_shell, bool ssl, + const char *ca_file, const char *certificate, const char *key_file) +{ + if (worker_shell && worker_shell->impl()) + { + gearman_universal_set_ssl(worker_shell->impl()->universal, ssl, ca_file, certificate, key_file); + } +} + void *gearman_worker_context(const gearman_worker_st *worker) { if (worker and worker->impl()) --- libgearman/worker.hpp 2014-02-12 08:05:28.000000000 +0800 +++ libgearman/worker.hpp 2014-07-07 14:23:58.669334929 +0800 @@ -134,6 +134,7 @@ void enable_ssl() { + return; if (getenv("GEARMAND_CA_CERTIFICATE")) { gearman_worker_add_options(_worker, GEARMAN_WORKER_SSL); --- libgearman-1.0/client.h 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-1.0/client.h 2014-08-06 11:04:56.002013287 +0800 @@ -172,6 +172,13 @@ void gearman_client_set_timeout(gearman_client_st *client, int timeout); /** + * See gearman_universal_set_ssl() for details. + */ +GEARMAN_API +void gearman_client_set_ssl(gearman_client_st *client, bool ssl, + const char *ca_file, const char *certificate, const char *key_file); + +/** * Get the application context for a client. * * @param[in] client Structure previously initialized with @@ -397,7 +404,56 @@ gearman_job_handle_t job_handle); /** - * Get the status for a backgound job. + * Run a task in the future. + * + * @param[in] client Structure previously initialized with + * gearman_client_create() or gearman_client_clone(). + * @param[in] function_name The name of the function to run. + * @param[in] unique Optional unique job identifier, or NULL for a new UUID. + * @param[in] workload The workload to pass to the function when it is run. + * @param[in] workload_size Size of the workload. + * @param[in] when The Time when to run. + * @param[out] job_handle A buffer to store the job handle in. Must be at least + GEARMAN_JOB_HANDLE_SIZE bytes long. + * @return Standard gearman return value. + */ +GEARMAN_API +gearman_return_t gearman_client_do_epoch(gearman_client_st *client, + const char *function_name, + const char *unique, + const void *workload, + size_t workload_size, + time_t when, + gearman_job_handle_t job_handle); + +/** + * Run a high priority task in the future. See + * gearman_client_do_epoch() for parameter and return information. + */ +GEARMAN_API +gearman_return_t gearman_client_do_high_epoch(gearman_client_st *client, + const char *function_name, + const char *unique, + const void *workload, + size_t workload_size, + time_t when, + gearman_job_handle_t job_handle); + +/** + * Run a low priority task in the future. See + * gearman_client_do_epoch() for parameter and return information. + */ +GEARMAN_API +gearman_return_t gearman_client_do_low_epoch(gearman_client_st *client, + const char *function_name, + const char *unique, + const void *workload, + size_t workload_size, + time_t when, + gearman_job_handle_t job_handle); + +/** + * Get the status for a backgound or future job. * * @param[in] client Structure previously initialized with * gearman_client_create() or gearman_client_clone(). @@ -570,6 +626,54 @@ gearman_return_t *ret_ptr); /** + * Add an epoch task to be run in parallel. See + * gearman_client_add_task() for details. + * @param[in] when The time when to run. + */ +GEARMAN_API +gearman_task_st *gearman_client_add_task_epoch(gearman_client_st *client, + gearman_task_st *task, + void *context, + const char *function_name, + const char *unique, + const void *workload, + size_t workload_size, + time_t when, + gearman_return_t *ret_ptr); + +/** + * Add an epoch task to be run in parallel. See + * gearman_client_add_task() for details. + * @param[in] when The time when to run. + */ +GEARMAN_API +gearman_task_st *gearman_client_add_task_high_epoch(gearman_client_st *client, + gearman_task_st *task, + void *context, + const char *function_name, + const char *unique, + const void *workload, + size_t workload_size, + time_t when, + gearman_return_t *ret_ptr); + +/** + * Add an epoch task to be run in parallel. See + * gearman_client_add_task() for details. + * @param[in] when The time when to run. + */ +GEARMAN_API +gearman_task_st *gearman_client_add_task_low_epoch(gearman_client_st *client, + gearman_task_st *task, + void *context, + const char *function_name, + const char *unique, + const void *workload, + size_t workload_size, + time_t when, + gearman_return_t *ret_ptr); + +/** * Add task to get the status for a backgound task in parallel. * * @param[in] client Structure previously initialized with --- libgearman-1.0/protocol.h 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-1.0/protocol.h 2014-08-02 19:19:49.235324876 +0800 @@ -87,6 +87,8 @@ GEARMAN_COMMAND_JOB_ASSIGN_ALL, /* J->W: HANDLE[0]FUNC[0]UNIQ[0]REDUCER[0]ARGS */ GEARMAN_COMMAND_GET_STATUS_UNIQUE, /* C->J: UNIQUE */ GEARMAN_COMMAND_STATUS_RES_UNIQUE, /* J->C: UNIQUE[0]KNOWN[0]RUNNING[0]NUM[0]DENOM[0]CLIENT_COUNT */ + GEARMAN_COMMAND_SUBMIT_JOB_HIGH_EPOCH, + GEARMAN_COMMAND_SUBMIT_JOB_LOW_EPOCH, GEARMAN_COMMAND_MAX /* Always add new commands before this. */ }; --- libgearman-1.0/worker.h 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-1.0/worker.h 2014-07-15 00:41:40.291909980 +0800 @@ -180,6 +180,13 @@ void gearman_worker_set_timeout(gearman_worker_st *worker, int timeout); /** + * See gearman_universal_set_ssl() for details. + */ +GEARMAN_API +void gearman_worker_set_ssl(gearman_worker_st *worker, bool ssl, + const char *ca_file, const char *certificate, const char *key_file); + +/** * Get the application context for a worker. * * @param[in] worker Structure previously initialized with --- libgearman-server/function.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-server/function.cc 2014-08-20 04:12:55.060829838 +0800 @@ -105,6 +105,14 @@ function->function_name[function_name_size]= 0; function->function_name_size= function_name_size; function->worker_list= NULL; + memset(function->job_epoch_list, 0, + sizeof(gearman_server_job_st *) * GEARMAN_JOB_PRIORITY_MAX); + memset(function->job_epoch_end, 0, + sizeof(gearman_server_job_st *) * GEARMAN_JOB_PRIORITY_MAX); + memset(function->job_bg_list, 0, + sizeof(gearman_server_job_st *) * GEARMAN_JOB_PRIORITY_MAX); + memset(function->job_bg_end, 0, + sizeof(gearman_server_job_st *) * GEARMAN_JOB_PRIORITY_MAX); memset(function->job_list, 0, sizeof(gearman_server_job_st *) * GEARMAN_JOB_PRIORITY_MAX); memset(function->job_end, 0, @@ -113,10 +121,10 @@ return function; } -gearman_server_function_st * -gearman_server_function_get(gearman_server_st *server, - const char *function_name, - size_t function_name_size) +gearman_server_function_st *gearman_server_function_get(gearman_server_st *server, + const char *function_name, + size_t function_name_size, + bool auto_create) { gearman_server_function_st *function; @@ -131,6 +139,11 @@ } } + if (auto_create == false) + { + return NULL; + } + return gearman_server_function_create(server, function_name, function_name_size, function_hash); } --- libgearman-server/function.h 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-server/function.h 2014-08-20 04:12:49.557496743 +0800 @@ -63,9 +63,10 @@ Add a new function to a server instance. */ GEARMAN_API - gearman_server_function_st * gearman_server_function_get(gearman_server_st *server, - const char *function_name, - size_t function_name_size); +gearman_server_function_st * gearman_server_function_get(gearman_server_st *server, + const char *function_name, + size_t function_name_size, + bool auto_create= true); /** * Free a server function structure. --- libgearman-server/gearmand.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-server/gearmand.cc 2014-08-12 16:14:17.865396910 +0800 @@ -175,6 +175,13 @@ delete worker; } + gearman_server_epoch_job_st *epoch_job= NULL; + while ((epoch_job= server.epoch_job_list) != NULL) + { + server.epoch_job_list= epoch_job->next; + gearman_server_epoch_job_free(epoch_job); + } + gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "removing queue: %s", (server.queue_version == QUEUE_VERSION_CLASS) ? "CLASS" : "FUNCTION"); if (server.queue_version == QUEUE_VERSION_CLASS) { @@ -1202,6 +1209,7 @@ uint32_t hashtable_buckets) { server.state.queue_startup= false; + server.state.epoch_startup= false; server.flags.round_robin= round_robin_arg; server.flags.threaded= false; server.shutdown= false; @@ -1223,6 +1231,7 @@ server.free_job_list= NULL; server.free_client_list= NULL; server.free_worker_list= NULL; + server.epoch_job_list= NULL; server.queue_version= QUEUE_VERSION_NONE; server.queue.object= NULL; --- libgearman-server/gearmand_con.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-server/gearmand_con.cc 2014-08-20 10:15:43.628600003 +0800 @@ -210,8 +210,8 @@ bool(server_job->unique[0]) ? server_job->unique : "", uint32_t(strlen(server_job->unique)), unique, uint32_t(unique_length)); - if (bool(server_job->unique[0]) and - (strcmp(server_job->unique, unique) == 0)) + if (server_job->unique_key == key and server_job->unique_length == unique_length + and strncmp(server_job->unique, unique, unique_length) == 0) { /* Check to make sure the worker asking for the job still owns the job. */ if (worker_con != NULL and @@ -237,8 +237,8 @@ for (gearman_server_job_st *server_job= server->job_hash[key % server->hashtable_buckets]; server_job != NULL; server_job= server_job->next) { - if (server_job->job_handle_key == key and - strncmp(server_job->job_handle, job_handle, GEARMAND_JOB_HANDLE_SIZE) == 0) + if (server_job->job_handle_key == key and strlen(server_job->job_handle) == job_handle_length + and strncmp(server_job->job_handle, job_handle, job_handle_length) == 0) { /* Check to make sure the worker asking for the job still owns the job. */ if (worker_con != NULL and @@ -254,59 +254,62 @@ return NULL; } -gearmand_error_t gearman_server_job_cancel(gearman_server_st& server, - const char *job_handle, - const size_t job_handle_length) +gearmand_error_t _gearman_server_job_cancel(gearman_server_job_st *server_job) { - gearmand_error_t ret= GEARMAND_NO_JOBS; - uint32_t key= _server_job_hash(job_handle, job_handle_length); + if (server_job == NULL) + { + return GEARMAND_NO_JOBS; + } - gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "cancel: %.*s", int(job_handle_length), job_handle); + /* Queue the fail packet for all clients. */ + for (gearman_server_client_st* client= server_job->client_list; client != NULL; client= client->job_next) + { + gearmand_error_t ret= gearman_server_io_packet_add(client->con, false, + GEARMAN_MAGIC_RESPONSE, + GEARMAN_COMMAND_WORK_FAIL, + server_job->job_handle, + (size_t)strlen(server_job->job_handle), + NULL); + if (gearmand_failed(ret)) + { + gearmand_log_gerror_warn(GEARMAN_DEFAULT_LOG_PARAM, ret, "Failed to send WORK_FAIL packet to %s:%s", client->con->host(), client->con->port()); + } + } - for (gearman_server_job_st *server_job= server.job_hash[key % server.hashtable_buckets]; - server_job != NULL; - server_job= server_job->next) + /* Remove from persistent queue if one exists. */ + if (server_job->job_queued) { - if (server_job->job_handle_key == key and - strncmp(server_job->job_handle, job_handle, GEARMAND_JOB_HANDLE_SIZE) == 0) + gearmand_error_t ret= gearman_queue_done(Server, + server_job->unique, + server_job->unique_length, + server_job->function->function_name, + server_job->function->function_name_size); + if (gearmand_failed(ret)) { - /* Queue the fail packet for all clients. */ - for (gearman_server_client_st* client= server_job->client_list; client != NULL; client= client->job_next) - { - ret= gearman_server_io_packet_add(client->con, false, - GEARMAN_MAGIC_RESPONSE, - GEARMAN_COMMAND_WORK_FAIL, - server_job->job_handle, - (size_t)strlen(server_job->job_handle), - NULL); - if (gearmand_failed(ret)) - { - gearmand_log_gerror_warn(GEARMAN_DEFAULT_LOG_PARAM, ret, "Failed to send WORK_FAIL packet to %s:%s", client->con->host(), client->con->port()); - } - } + return gearmand_gerror("Remove from persistent queue", ret); + } + } - /* Remove from persistent queue if one exists. */ - if (server_job->job_queued) - { - ret= gearman_queue_done(Server, - server_job->unique, - server_job->unique_length, - server_job->function->function_name, - server_job->function->function_name_size); - if (gearmand_failed(ret)) - { - return gearmand_gerror("Remove from persistent queue", ret); - } - } + server_job->ignore_job= true; + server_job->job_queued= false; - server_job->ignore_job= true; - server_job->job_queued= false; + return GEARMAND_SUCCESS; +} - return GEARMAND_SUCCESS; - } - } +gearmand_error_t gearman_server_job_cancel_by_unique(gearman_server_st& server, + const char *unique, + const size_t unique_length) +{ + gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "cancel: %.*s", unique_length, unique); + return _gearman_server_job_cancel(gearman_server_job_get_by_unique(&server, unique, unique_length, NULL)); +} - return ret; +gearmand_error_t gearman_server_job_cancel(gearman_server_st& server, + const char *job_handle, + const size_t job_handle_length) +{ + gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "cancel: %.*s", job_handle_length, job_handle); + return _gearman_server_job_cancel(gearman_server_job_get(&server, job_handle, job_handle_length, NULL)); } gearman_server_job_st * gearman_server_job_peek(gearman_server_con_st *server_con) @@ -315,35 +318,38 @@ server_worker != NULL; server_worker= server_worker->con_next) { - if (server_worker->function->job_count != 0) + if (server_worker->function != NULL and server_worker->function->job_count != 0) { for (gearman_job_priority_t priority= GEARMAN_JOB_PRIORITY_HIGH; priority != GEARMAN_JOB_PRIORITY_MAX; priority= gearman_job_priority_t(int(priority) +1)) { - gearman_server_job_st *server_job; - server_job= server_worker->function->job_list[priority]; - - int64_t current_time= (int64_t)time(NULL); - - while(server_job && - server_job->when != 0 && - server_job->when > current_time) + gearman_server_job_st *server_job= NULL; + if (server_job == NULL) { - server_job= server_job->function_next; + server_job= server_worker->function->job_epoch_list[priority]; + if (server_job != NULL and server_job->when > time(NULL)) + { + server_job= NULL; + } } - - if (server_job != NULL) + if (server_job == NULL) { + server_job= server_worker->function->job_list[priority]; + } + if (server_job == NULL) + { + server_job= server_worker->function->job_bg_list[priority]; + } + if (server_job != NULL) + { if (server_job->ignore_job) { /* This is only happens when a client disconnects from a foreground job. We do this because we don't want to run the job anymore. */ - server_job->ignore_job= false; - - gearman_server_job_free(gearman_server_job_take(server_con)); - + server_job->function->job_count--; + gearman_server_job_free(server_job); return gearman_server_job_peek(server_con); } @@ -352,16 +358,15 @@ } } } - return NULL; } gearman_server_job_st *gearman_server_job_take(gearman_server_con_st *server_con) { - for (gearman_server_worker_st *server_worker= server_con->worker_list; server_worker; server_worker= server_worker->con_next) + for (gearman_server_worker_st *server_worker= server_con->worker_list; server_worker != NULL; server_worker= server_worker->con_next) { - if (server_worker->function and server_worker->function->job_count) + if (server_worker->function != NULL and server_worker->function->job_count) { gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Jobs available for %.*s: %lu", (int)server_worker->function->function_name_size, server_worker->function->function_name, @@ -378,47 +383,49 @@ } } + gearman_server_job_st *server_job= NULL; gearman_job_priority_t priority; - for (priority= GEARMAN_JOB_PRIORITY_HIGH; priority < GEARMAN_JOB_PRIORITY_LOW; - priority= gearman_job_priority_t(int(priority) +1)) + if (server_job == NULL) { - if (server_worker->function->job_list[priority]) + for (priority= GEARMAN_JOB_PRIORITY_HIGH; priority < GEARMAN_JOB_PRIORITY_MAX; + priority= gearman_job_priority_t(int(priority) +1)) { - break; + if ((server_job= server_worker->function->job_epoch_list[priority]) != NULL and + ((server_job->when <= time(NULL)) or (server_job= NULL))) + { + server_job->function->job_epoch_list[priority]= server_job->function_next; + break; + } } } - - gearman_server_job_st *server_job= server_worker->function->job_list[priority]; - gearman_server_job_st *previous_job= server_job; - - int64_t current_time= (int64_t)time(NULL); - - while (server_job and server_job->when != 0 and server_job->when > current_time) + if (server_job == NULL) { - previous_job= server_job; - server_job= server_job->function_next; - } - - if (server_job) - { - if (server_job->function->job_list[priority] == server_job) - { - // If it's the head of the list, advance it - server_job->function->job_list[priority]= server_job->function_next; - } - else + for (priority= GEARMAN_JOB_PRIORITY_HIGH; priority < GEARMAN_JOB_PRIORITY_MAX; + priority= gearman_job_priority_t(int(priority) +1)) { - // Otherwise, just remove the item from the list - previous_job->function_next= server_job->function_next; + if ((server_job= server_worker->function->job_list[priority]) != NULL) + { + server_job->function->job_list[priority]= server_job->function_next; + break; + } } - - // If it's the tail of the list, move the tail back - if (server_job->function->job_end[priority] == server_job) + } + if (server_job == NULL) + { + for (priority= GEARMAN_JOB_PRIORITY_HIGH; priority < GEARMAN_JOB_PRIORITY_MAX; + priority= gearman_job_priority_t(int(priority) +1)) { - server_job->function->job_end[priority]= previous_job; + if ((server_job= server_worker->function->job_bg_list[priority]) != NULL) + { + server_job->function->job_bg_list[priority]= server_job->function_next; + break; + } } - server_job->function->job_count--; + } + if (server_job != NULL) + { + server_job->function->job_count--; server_job->worker= server_worker; GEARMAND_LIST_ADD(server_worker->job, server_job, worker_); server_job->function->job_running++; --- libgearman-server/gearmand_con.h 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-server/gearmand_con.h 2014-08-19 21:56:03.535139933 +0800 @@ -78,11 +78,18 @@ const char *host, const char*, struct gearmand_port_st*); +gearmand_error_t _gearman_server_job_cancel(gearman_server_job_st *server_job); + GEARMAN_API gearmand_error_t gearman_server_job_cancel(gearman_server_st& server, const char *job_handle, const size_t job_handle_length); +GEARMAN_API +gearmand_error_t gearman_server_job_cancel_by_unique(gearman_server_st& server, + const char *job_handle, + const size_t job_handle_length); + /** * Free resources used by a connection. * @param dcon Connection previously initialized with gearmand_con_create. --- libgearman-server/io.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-server/io.cc 2014-08-12 17:30:28.968771823 +0800 @@ -159,6 +159,10 @@ case SSL_ERROR_SSL: default: { // All other errors + if (ERR_peek_last_error()) + { + ssl_error= ERR_peek_last_error(); + } char errorString[SSL_ERROR_SIZE]; ERR_error_string_n(ssl_error, errorString, sizeof(errorString)); ret= GEARMAND_LOST_CONNECTION; @@ -338,6 +342,10 @@ case SSL_ERROR_SSL: default: { + if (ERR_peek_last_error()) + { + ssl_error= ERR_peek_last_error(); + } char errorString[SSL_ERROR_SIZE]; ERR_error_string_n(ssl_error, errorString, sizeof(errorString)); _connection_close(connection); @@ -606,7 +614,11 @@ { return GEARMAND_SUCCESS; } - else if (ret != GEARMAND_FLUSH_DATA) + else if (ret == GEARMAND_FLUSH_DATA) + { + connection->send_buffer_size+= send_size; + } + else { return ret; } @@ -801,8 +813,9 @@ connection->recv_buffer_size+= recv_size; } - if (packet->data_size == 0) + if (packet->data_size == 0 or packet->data_size <= connection->recv_data_offset) { + connection->recv_data_offset= 0; connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE; break; } @@ -815,17 +828,20 @@ break; } - packet->data= static_cast(realloc(NULL, packet->data_size)); - if (not packet->data) { - // Server up the memory error first, in case _connection_close() - // creates any. - gearmand_merror("realloc", char, packet->data_size); - _connection_close(connection); - return GEARMAND_MEMORY_ALLOCATION_FAILURE; + char *new_data= static_cast(realloc((char *)packet->data, packet->data_size)); + if (new_data == NULL) + { + // Server up the memory error first, in case _connection_close() + // creates any. + gearmand_merror("realloc", char, packet->data_size); + _connection_close(connection); + return GEARMAND_MEMORY_ALLOCATION_FAILURE; + } + packet->data= new_data; + packet->options.free_data= true; } - packet->options.free_data= true; connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_STATE_READ_DATA; case gearmand_io_st::GEARMAND_CON_RECV_STATE_READ_DATA: --- libgearman-server/job.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-server/job.cc 2014-08-19 09:56:15.225300295 +0800 @@ -46,6 +46,7 @@ #include "gear_config.h" #include "libgearman-server/common.h" #include +#include #include @@ -339,7 +340,7 @@ gearmand_error_t gearman_server_job_queue(gearman_server_job_st *job) { - if (job->worker) + if (job->worker != NULL) { job->retries++; if (Server->job_retries != 0 && Server->job_retries == job->retries) @@ -389,7 +390,7 @@ } /* Queue NOOP for possible sleeping workers. */ - if (job->function->worker_list != NULL) + if (job->when <= time(NULL) and job->function->worker_list != NULL) { gearman_server_worker_st *worker= job->function->worker_list; uint32_t noop_sent= 0; @@ -422,18 +423,163 @@ } /* Queue the job to be run. */ - if (job->function->job_list[job->priority] == NULL) + if (job->job_queued == false) { - job->function->job_list[job->priority]= job; + if (job->function->job_list[job->priority] == NULL) + { + job->function->job_list[job->priority]= job; + } + else + { + job->function->job_end[job->priority]->function_next= job; + } + job->function->job_end[job->priority]= job; + } + else if (job->when == 0) + { + if (job->function->job_bg_list[job->priority] == NULL) + { + job->function->job_bg_list[job->priority]= job; + } + else + { + job->function->job_bg_end[job->priority]->function_next= job; + } + job->function->job_bg_end[job->priority]= job; } else { - job->function->job_end[job->priority]->function_next= job; + gearman_server_job_st *job0= NULL, *job1= job->function->job_epoch_list[job->priority]; + for (; job1 != NULL and job1->when <= job->when; job0= job1, job1= job1->function_next); + if (job0 == NULL) + { + job->function->job_epoch_list[job->priority]= job; + } + else + { + job0->function_next= job; + } + if (job1 == NULL) + { + job->function->job_epoch_end[job->priority]= job; + } + else + { + job->function_next= job1; + } + + if (job->when > time(NULL)) + { + gearman_server_epoch_job_push(job->when, job->function); + } } - job->function->job_end[job->priority]= job; job->function->job_count++; return GEARMAND_SUCCESS; } #pragma GCC diagnostic pop + +void gearman_server_epoch_job_free(gearman_server_epoch_job_st *epoch_job) +{ + if (epoch_job == NULL) + { + return; + } + gearman_server_epoch_function_st *epoch_function= NULL; + while ((epoch_function= epoch_job->function_list) != NULL) + { + epoch_job->function_list= epoch_function->next; + delete epoch_function; + } + delete epoch_job; +} + +void gearman_server_epoch_job_push(int64_t when, gearman_server_function_st *function) +{ + gearman_server_st *server= gearmand_server(Gearmand()); + if (!server->state.epoch_startup) + { + signal(SIGALRM, gearman_server_epoch_job_pop); + server->state.epoch_startup= true; + } + + gearman_server_epoch_job_st *epoch_job= NULL, *epoch_job0= NULL, *epoch_job1= server->epoch_job_list; + for (; epoch_job1 != NULL and epoch_job1->when <= when; epoch_job0= epoch_job1, epoch_job1= epoch_job1->next); + if (epoch_job0 == NULL or epoch_job0->when < when) + { + epoch_job= new gearman_server_epoch_job_st; + epoch_job->when= when; + epoch_job->next= epoch_job1; + epoch_job->function_list= NULL; + if (epoch_job0 == NULL) + { + server->epoch_job_list= epoch_job; + alarm((epoch_job->when > time(NULL)) ? (epoch_job->when -time(NULL)) : 0); + } + else + { + epoch_job0->next= epoch_job; + } + } + else + { + epoch_job= epoch_job0; + } + + gearman_server_epoch_function_st *epoch_function= NULL, *epoch_function1= epoch_job->function_list; + for (; epoch_function1 != NULL and epoch_function1->function != function; epoch_function1= epoch_function1->next); + if (epoch_function1 == NULL) + { + epoch_function= new gearman_server_epoch_function_st; + epoch_function->function= function; + epoch_function->next= epoch_job->function_list; + epoch_job->function_list= epoch_function; + } +} + +void gearman_server_epoch_job_pop(int) +{ + gearman_server_worker_st *worker= NULL; + gearman_server_epoch_job_st *epoch_job= NULL; + gearman_server_epoch_function_st *epoch_function= NULL; + gearman_server_st *server= gearmand_server(Gearmand()); + while ((epoch_job= server->epoch_job_list) != NULL and epoch_job->when <= time(NULL)) + { + while ((epoch_function= epoch_job->function_list) != NULL) + { + if ((worker= epoch_function->function->worker_list) != NULL) + { + do + { + if (worker->con->is_sleeping and !worker->con->is_noop_sent) + { + gearmand_error_t ret= gearman_server_io_packet_add(worker->con, false, + GEARMAN_MAGIC_RESPONSE, + GEARMAN_COMMAND_NOOP, NULL); + if (gearmand_failed(ret)) + { + gearmand_log_gerror_warn(GEARMAN_DEFAULT_LOG_PARAM, ret, "Failed to send NOOP packet to %s:%s", worker->con->host(), worker->con->port()); + } + else + { + worker->con->is_noop_sent= true; + } + } + } + while ((worker= worker->function_next) != NULL and worker != epoch_function->function->worker_list); + } + + epoch_job->function_list= epoch_function->next; + delete epoch_function; + } + + server->epoch_job_list= epoch_job->next; + delete epoch_job; + } + + if (epoch_job != NULL) + { + alarm((epoch_job->when > time(NULL)) ? (epoch_job->when -time(NULL)) : 0); + } +} --- libgearman-server/job.h 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-server/job.h 2014-08-04 21:11:15.388735788 +0800 @@ -128,6 +128,12 @@ GEARMAN_API gearmand_error_t gearman_server_job_queue(gearman_server_job_st *server_job); +void gearman_server_epoch_job_free(gearman_server_epoch_job_st*); + +void gearman_server_epoch_job_push(int64_t, gearman_server_function_st*); + +void gearman_server_epoch_job_pop(int); + uint32_t _server_job_hash(const char *key, size_t key_size); void *_proc(void *data); --- libgearman-server/plugins/base.h 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-server/plugins/base.h 2014-08-12 09:52:22.425352472 +0800 @@ -183,6 +183,12 @@ return true; } + // If the protocol only has one response to client + virtual bool is_response_once() const + { + return false; + } + // Notify on disconnect virtual void notify(gearman_server_con_st*) { --- libgearman-server/plugins/protocol/gear/protocol.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-server/plugins/protocol/gear/protocol.cc 2014-08-12 09:47:25.845349775 +0800 @@ -393,6 +393,10 @@ case SSL_ERROR_SSL: case SSL_ERROR_ZERO_RETURN: default: + if (ERR_peek_last_error()) + { + cyassl_error= ERR_peek_last_error(); + } char cyassl_error_buffer[SSL_ERROR_SIZE]= { 0 }; ERR_error_string_n(cyassl_error, cyassl_error_buffer, sizeof(cyassl_error_buffer)); return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_LOST_CONNECTION, "%s(%d)", @@ -418,9 +422,9 @@ Gear::Gear() : Plugin("Gear"), _port(GEARMAN_DEFAULT_TCP_PORT_STRING), - _ssl_ca_file(GEARMAND_CA_CERTIFICATE), - _ssl_certificate(GEARMAND_SERVER_PEM), - _ssl_key(GEARMAND_SERVER_KEY), + _ssl_ca_file(""), + _ssl_certificate(""), + _ssl_key(""), opt_ssl(false) { command_line_options().add_options() @@ -477,19 +481,40 @@ if (opt_ssl) { - if (getenv("GEARMAND_CA_CERTIFICATE")) + if (_ssl_ca_file.empty()) { - _ssl_ca_file= getenv("GEARMAND_CA_CERTIFICATE"); + if (getenv("GEARMAND_CA_CERTIFICATE")) + { + _ssl_ca_file= getenv("GEARMAND_CA_CERTIFICATE"); + } + else + { + _ssl_ca_file= GEARMAND_CA_CERTIFICATE; + } } - if (getenv("GEARMAND_SERVER_PEM")) + if (_ssl_certificate.empty()) { - _ssl_certificate= getenv("GEARMAND_SERVER_PEM"); + if (getenv("GEARMAND_SERVER_PEM")) + { + _ssl_certificate= getenv("GEARMAND_SERVER_PEM"); + } + else + { + _ssl_certificate= GEARMAND_SERVER_PEM; + } } - if (getenv("GEARMAND_SERVER_KEY")) + if (_ssl_key.empty()) { - _ssl_key= getenv("GEARMAND_SERVER_KEY"); + if (getenv("GEARMAND_SERVER_KEY")) + { + _ssl_key= getenv("GEARMAND_SERVER_KEY"); + } + else + { + _ssl_key= GEARMAND_SERVER_KEY; + } } gearmand->init_ssl(); @@ -512,6 +537,12 @@ } gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Loading certificate key : %s", _ssl_key.c_str()); + if (SSL_CTX_check_private_key(gearmand->ctx_ssl()) != SSL_SUCCESS) + { + gearmand_log_fatal(GEARMAN_DEFAULT_LOG_PARAM, "SSL_CTX_check_private_key() cannot check certificate %s", _ssl_key.c_str()); + } + gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Checking certificate key : %s", _ssl_key.c_str()); + assert(gearmand->ctx_ssl()); } #endif --- libgearman-server/plugins/protocol/http/protocol.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-server/plugins/protocol/http/protocol.cc 2014-08-12 17:29:45.585438095 +0800 @@ -73,10 +73,27 @@ class HTTPtext : public gearmand::protocol::Context { public: + struct job_content_st { + char *content; + size_t content_size; + size_t sent_offset; + job_content_st *next; + }; + struct job_contents_st { + char *handle; + size_t handle_size; + bool sent_header; + job_content_st *content_list; + job_content_st *content_end; + size_t content_size; + job_contents_st *prev; + job_contents_st *next; + }; HTTPtext() : + contents_list(NULL), + contents_end(NULL), _method(gearmand::protocol::httpd::TRACE), - _sent_header(false), _background(false), _keep_alive(false), _http_response(gearmand::protocol::httpd::HTTP_OK) @@ -84,7 +101,27 @@ } ~HTTPtext() - { } + { + job_content_st *content= NULL; + job_contents_st *contents= NULL; + while ((contents= contents_list) != NULL) + { + while ((content= contents->content_list) != NULL) + { + contents->content_list= content->next; + delete []content->content; + delete content; + } + contents_list= contents->next; + delete []contents->handle; + delete contents; + } + } + + bool is_response_once() const + { + return true; + } void notify(gearman_server_con_st*) { @@ -96,21 +133,112 @@ void *send_buffer, const size_t send_buffer_size, gearmand_error_t& ret_ptr) { + job_content_st *content= NULL; + job_contents_st *contents= NULL; + switch (packet->command) { + case GEARMAN_COMMAND_JOB_CREATED: + break; + + case GEARMAN_COMMAND_WORK_COMPLETE: + for (contents= contents_list; contents != NULL; contents= contents->next) + { + if (contents->handle_size == packet->arg_size[0] and + memcmp(contents->handle, packet->arg[0], contents->handle_size) == 0) + { + break; + } + } + break; + + case GEARMAN_COMMAND_WORK_FAIL: + case GEARMAN_COMMAND_ECHO_RES: + break; + case GEARMAN_COMMAND_WORK_DATA: + for (contents= contents_list; contents != NULL; contents= contents->next) + { + if (contents->handle_size == packet->arg_size[0] and + memcmp(contents->handle, packet->arg[0], contents->handle_size) == 0) + { + break; + } + } + if (contents == NULL) { - for (const char *ptr= packet->data; ptr <= (packet->data +packet->data_size) -2; ptr++) + if ((contents= new job_contents_st) == NULL) { - content.push_back(*ptr); + gearmand_merror("new job_contents_st", job_contents_st, 0); + ret_ptr= GEARMAND_MEMORY_ALLOCATION_FAILURE; + return 0; } + contents->handle= NULL; + contents->handle_size= packet->arg_size[0]; + contents->sent_header= false; + contents->content_list= NULL; + contents->content_end= NULL; + contents->content_size= 0; + contents->prev= NULL; + contents->next= NULL; - gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "HTTP gearmand_command_t: GEARMAN_COMMAND_WORK_DATA length:%" PRIu64, uint64_t(content.size())); - ret_ptr= GEARMAND_IGNORE_PACKET; - return 0; + if ((contents->handle= new char[contents->handle_size]) == NULL) + { + gearmand_merror("new char[]", char, contents->handle_size); + ret_ptr= GEARMAND_MEMORY_ALLOCATION_FAILURE; + delete contents; + return 0; + } + memcpy(contents->handle, packet->arg[0], contents->handle_size); + + if (contents_list == NULL) + { + contents_end= contents_list= contents; + } + else + { + contents_end->next= contents; + contents->prev= contents_end; + contents_end= contents; + } } + if (packet->data_size > 0) + { + if ((content= new job_content_st) == NULL) + { + gearmand_merror("new job_content_st", job_content_st, 0); + ret_ptr= GEARMAND_MEMORY_ALLOCATION_FAILURE; + return 0; + } + content->content= NULL; + content->content_size= packet->data_size; + content->sent_offset= 0; + content->next= NULL; + + if ((content->content= new char[content->content_size]) == NULL) + { + gearmand_merror("new char[]", char, content->content_size); + ret_ptr= GEARMAND_MEMORY_ALLOCATION_FAILURE; + delete content; + return 0; + } + memcpy(content->content, packet->data, content->content_size); + contents->content_size+= content->content_size; + + if (contents->content_list == NULL) + { + contents->content_end= contents->content_list= content; + } + else + { + contents->content_end->next= content; + contents->content_end= content; + } + } + gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "HTTP gearmand_command_t: GEARMAN_COMMAND_WORK_DATA length:%" PRIu64 ", total length:%" PRIu64, (uint64_t)content->content_size, (uint64_t)contents->content_size); + ret_ptr= GEARMAND_IGNORE_PACKET; + return 0; - default: case GEARMAN_COMMAND_TEXT: case GEARMAN_COMMAND_CAN_DO: case GEARMAN_COMMAND_CANT_DO: @@ -149,35 +277,25 @@ case GEARMAN_COMMAND_JOB_ASSIGN_ALL: case GEARMAN_COMMAND_GET_STATUS_UNIQUE: case GEARMAN_COMMAND_STATUS_RES_UNIQUE: + case GEARMAN_COMMAND_SUBMIT_JOB_HIGH_EPOCH: + case GEARMAN_COMMAND_SUBMIT_JOB_LOW_EPOCH: case GEARMAN_COMMAND_MAX: + default: gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Bad packet command: gearmand_command_t:%s", gearman_strcommand(packet->command)); assert(0); - case GEARMAN_COMMAND_WORK_FAIL: - case GEARMAN_COMMAND_ECHO_RES: - case GEARMAN_COMMAND_WORK_COMPLETE: - break; - - case GEARMAN_COMMAND_JOB_CREATED: - { - gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, - "Sending HTTP told to ignore packet: gearmand_command_t:%s", - gearman_strcommand(packet->command)); - ret_ptr= GEARMAND_IGNORE_PACKET; - return 0; - } } gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Sending HTTP response: Content-length:%" PRIu64 " data_size:%" PRIu64 " gearmand_command_t:%s response:%s", - uint64_t(content.size()), - uint64_t(packet->data_size), + (uint64_t)((contents == NULL ? 0 : contents->content_size) +packet->data_size), + (uint64_t)packet->data_size, gearman_strcommand(packet->command), gearmand::protocol::httpd::response(response())); size_t pack_size= 0; - if (_sent_header == false) + if (contents == NULL or contents->sent_header == false) { if (response() != gearmand::protocol::httpd::HTTP_OK) { @@ -198,7 +316,7 @@ "\r\n", packet->command == GEARMAN_COMMAND_JOB_CREATED ? (int)packet->arg_size[0] : (int)packet->arg_size[0] - 1, (const char *)packet->arg[0], - (uint64_t)packet->data_size); + (uint64_t)((contents == NULL ? 0 : contents->content_size) +packet->data_size)); } else if (method() == gearmand::protocol::httpd::TRACE) { @@ -215,14 +333,13 @@ "HTTP/1.0 200 OK\r\n" "X-Gearman-Job-Handle: %.*s\r\n" "X-Gearman-Command: %s\r\n" - "Content-Length: %d\r\n" + "Content-Length: %" PRIu64 "\r\n" "Server: Gearman/" PACKAGE_VERSION "\r\n" - "\r\n%.*s", + "\r\n", packet->command == GEARMAN_COMMAND_JOB_CREATED ? int(packet->arg_size[0]) : int(packet->arg_size[0] - 1), (const char *)packet->arg[0], // Job handle gearman_strcommand(packet->command), - int(content.size()), // Content-length - int(content.size()), &content[0]); + (uint64_t)((contents == NULL ? 0 : contents->content_size) +packet->data_size)); // Content-length } else { @@ -236,10 +353,8 @@ packet->command == GEARMAN_COMMAND_JOB_CREATED ? int(packet->arg_size[0]) : int(packet->arg_size[0] - 1), (const char *)packet->arg[0], gearman_strcommand(packet->command), - uint64_t(content.size())); + (uint64_t)((contents == NULL ? 0 : contents->content_size) +packet->data_size)); } - - _sent_header= true; } if (pack_size > send_buffer_size) @@ -249,12 +364,54 @@ return 0; } - memcpy(send_buffer, &content[0], content.size()); - pack_size+= content.size(); + if (contents != NULL) + { + contents->sent_header= true; + size_t content_size= 0; + while ((content= contents->content_list) != NULL) + { + content_size= content->content_size -content->sent_offset; + if (content_size > send_buffer_size -pack_size) + { + content_size= send_buffer_size -pack_size; + } + if (content_size > 0) + { + memcpy((char *)send_buffer +pack_size, content->content +content->sent_offset, content_size); + content->sent_offset+= content_size; + pack_size+= content_size; + } + if (content->sent_offset < content->content_size) + { + gearmand_debug("Sending HTTP had to flush"); + ret_ptr= GEARMAND_FLUSH_DATA; + return pack_size; + } + contents->content_list= content->next; + delete []content->content; + delete content; + } + if (contents == contents_list) + { + contents_list= contents->next; + } + if (contents == contents_end) + { + contents_end= contents->prev; + } + if (contents->prev != NULL) + { + contents->prev->next= contents->next; + } + if (contents->next != NULL) + { + contents->next->prev= contents->prev; + } + delete []contents->handle; + delete contents; + } -#if 0 if (keep_alive() == false) -#endif { gearman_io_set_option(&connection->con, GEARMAND_CON_CLOSE_AFTER_FLUSH, true); } @@ -265,12 +422,15 @@ } size_t unpack(gearmand_packet_st *packet, - gearman_server_con_st *, //connection + gearman_server_con_st *connection, const void *data, const size_t data_size, gearmand_error_t& ret_ptr) { + packet->data_size= 0; const char *unique= "-"; - size_t unique_size= 2; + size_t unique_size= 1; + const char *when= NULL; + size_t when_size= 0; gearman_job_priority_t priority= GEARMAN_JOB_PRIORITY_NORMAL; gearmand_info("Receiving HTTP response"); @@ -294,7 +454,7 @@ { gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "bad request line: %.*s", (uint32_t)request_size, request); set_response(gearmand::protocol::httpd::HTTP_NOT_FOUND); - ret_ptr= GEARMAND_SUCCESS; + ret_ptr= GEARMAND_INVALID_PACKET; return 0; } @@ -330,7 +490,7 @@ { gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "bad method: %.*s", (uint32_t)method_size, method_str); set_response(gearmand::protocol::httpd::HTTP_METHOD_NOT_ALLOWED); - ret_ptr= GEARMAND_SUCCESS; + ret_ptr= GEARMAND_INVALID_PACKET; return 0; } } @@ -361,9 +521,33 @@ gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "HTTP URI: \"%.*s\"", (int)uri_size, uri); switch (method()) { + case gearmand::protocol::httpd::GET: + { + const char *arg= (const char*)memchr(uri, '?', uri_size); + if (arg != NULL) + { + uri_size= arg -uri; + while (*arg == '?' || *arg == '&') + { + arg++; + } + packet->data_size+= (size_t)(version - arg); + char *new_data= static_cast(realloc((char *)packet->data, packet->data_size)); + if (new_data == NULL) + { + gearmand_merror("realloc", char, packet->data_size); + ret_ptr= GEARMAND_MEMORY_ALLOCATION_FAILURE; + return 0; + } + packet->data= new_data; + packet->options.free_data= true; + memcpy((char *)packet->data, arg, packet->data_size); + connection->con.recv_data_offset+= packet->data_size; + } + } + case gearmand::protocol::httpd::POST: case gearmand::protocol::httpd::PUT: - case gearmand::protocol::httpd::GET: if (uri_size == 0) { gearmand_error("must give function name in URI"); @@ -415,7 +599,7 @@ char content_length[11]; /* 11 bytes to fit max display length of uint32_t */ snprintf(content_length, sizeof(content_length), "%.*s", (int)header_size - 16, header + 16); - packet->data_size= size_t(atoi(content_length)); + packet->data_size+= size_t(atoi(content_length)); } } else if (header_size == 22 and @@ -429,6 +613,12 @@ unique= header + 18; unique_size= header_size -18; } + else if (header_size > 16 and + strncasecmp(header, "X-Gearman-When: ", 16) == 0) + { + when= header + 16; + when_size= header_size -16; + } else if (header_size == 26 and strncasecmp(header, "X-Gearman-Background: true", 26) == 0) { @@ -492,7 +682,22 @@ } else { - if (background()) + if (when != NULL and when_size > 0) + { + if (priority == GEARMAN_JOB_PRIORITY_NORMAL) + { + packet->command= GEARMAN_COMMAND_SUBMIT_JOB_EPOCH; + } + else if (priority == GEARMAN_JOB_PRIORITY_HIGH) + { + packet->command= GEARMAN_COMMAND_SUBMIT_JOB_HIGH_EPOCH; + } + else + { + packet->command= GEARMAN_COMMAND_SUBMIT_JOB_LOW_EPOCH; + } + } + else if (background()) { if (priority == GEARMAN_JOB_PRIORITY_NORMAL) { @@ -542,6 +747,15 @@ packet->arg[0][uri_size]= 0; packet->arg[1][unique_size]= 0; + if (when != NULL and when_size > 0) + { + if ((ret_ptr= gearmand_packet_create(packet, when, when_size +1)) != GEARMAND_SUCCESS) + { + return 0; + } + packet->arg[2][when_size]= 0; + } + ret_ptr= GEARMAND_SUCCESS; } @@ -592,10 +806,8 @@ void reset() { - _sent_header= false; _background= false; _keep_alive= false; - content.clear(); _method= gearmand::protocol::httpd::TRACE; _http_response= gearmand::protocol::httpd::HTTP_OK; } @@ -624,13 +836,13 @@ } private: + job_contents_st *contents_list; + job_contents_st *contents_end; gearmand::protocol::httpd::method_t _method; - bool _sent_header; bool _background; bool _keep_alive; std::string global_port; gearmand::protocol::httpd::response_t _http_response; - std::vector content; }; static gearmand_error_t _http_con_remove(gearman_server_con_st*) --- libgearman-server/plugins/queue/mysql/queue.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-server/plugins/queue/mysql/queue.cc 2014-08-18 15:55:19.891842049 +0800 @@ -78,6 +78,10 @@ MYSQL *con; MYSQL_STMT *add_stmt; MYSQL_STMT *done_stmt; + bool mysql_ssl; + std::string mysql_ca; + std::string mysql_cert; + std::string mysql_key; std::string mysql_host; std::string mysql_user; std::string mysql_password; @@ -97,9 +101,17 @@ Queue("MySQL"), con(NULL), add_stmt(NULL), - done_stmt(NULL) + done_stmt(NULL), + mysql_ssl(false), + mysql_ca(""), + mysql_cert(""), + mysql_key("") { command_line_options().add_options() + ("mysql-ssl", boost::program_options::bool_switch(&mysql_ssl)->default_value(false), "MySQL SSL enabled.") + ("mysql-ca", boost::program_options::value(&mysql_ca), "MySQL SSL CA file.") + ("mysql-cert", boost::program_options::value(&mysql_cert), "MySQL SSL certificate file.") + ("mysql-key", boost::program_options::value(&mysql_key), "MySQL SSL key file.") ("mysql-host", boost::program_options::value(&mysql_host)->default_value("localhost"), "MySQL host.") ("mysql-port", boost::program_options::value(&_port)->default_value(3306), "Port of server. (by default 3306)") ("mysql-user", boost::program_options::value(&mysql_user)->default_value(""), "MySQL user.") @@ -129,6 +141,10 @@ { char query_buffer[1024]; + if (this->add_stmt != NULL) + { + mysql_stmt_close(this->add_stmt); + } if ((this->add_stmt= mysql_stmt_init(this->con)) == NULL) { gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_init failed: %s", mysql_error(this->con)); @@ -153,6 +169,10 @@ { char query_buffer[1024]; + if (this->done_stmt != NULL) + { + mysql_stmt_close(this->done_stmt); + } if ((this->done_stmt= mysql_stmt_init(this->con)) == NULL) { gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_init failed: %s", mysql_error(this->con)); @@ -218,6 +238,15 @@ mysql_options(queue->con, MYSQL_READ_DEFAULT_GROUP, "gearmand"); + if (queue->mysql_ssl) + { + mysql_ssl_set(queue->con, + queue->mysql_key.c_str(), + queue->mysql_cert.c_str(), + queue->mysql_ca.c_str(), + NULL, NULL); + } + if (!mysql_real_connect(queue->con, queue->mysql_host.c_str(), queue->mysql_user.c_str(), @@ -329,32 +358,21 @@ { if (mysql_stmt_bind_param(queue->add_stmt, bind)) { - if ( mysql_stmt_errno(queue->add_stmt) == CR_NO_PREPARE_STMT ) + gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_bind_param (%.*s %.*s) failed: [%d]%s", function_name_size, function_name, unique_size, unique, mysql_stmt_errno(queue->add_stmt), mysql_stmt_error(queue->add_stmt)); + if (queue->prepareAddStatement() != GEARMAND_QUEUE_ERROR) { - if (queue->prepareAddStatement() == GEARMAND_QUEUE_ERROR) - { - return GEARMAND_QUEUE_ERROR; - } continue; } - else - { - gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_bind_param failed: %s", mysql_error(queue->con)); - return GEARMAND_QUEUE_ERROR; - } + return GEARMAND_QUEUE_ERROR; } if (mysql_stmt_execute(queue->add_stmt)) { - if ( mysql_stmt_errno(queue->add_stmt) == CR_SERVER_LOST ) + gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_execute (%.*s %.*s) failed: [%d]%s", function_name_size, function_name, unique_size, unique, mysql_stmt_errno(queue->add_stmt), mysql_stmt_error(queue->add_stmt)); + if (queue->prepareAddStatement() != GEARMAND_QUEUE_ERROR) { - mysql_stmt_close(queue->add_stmt); - if (queue->prepareAddStatement() != GEARMAND_QUEUE_ERROR) - { - continue; - } + continue; } - gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_execute failed: %s", mysql_error(queue->con)); return GEARMAND_QUEUE_ERROR; } @@ -378,7 +396,6 @@ const char *function_name, size_t function_name_size) { - MYSQL_BIND bind[2]; gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,"MySQL queue done: %.*s %.*s", (uint32_t) unique_size, (char *) unique, @@ -402,33 +419,21 @@ { if (mysql_stmt_bind_param(queue->done_stmt, bind)) { - if ( mysql_stmt_errno(queue->done_stmt) == CR_NO_PREPARE_STMT ) + gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_bind_param (%.*s %.*s) failed: [%d]%s", function_name_size, function_name, unique_size, unique, mysql_stmt_errno(queue->done_stmt), mysql_stmt_error(queue->done_stmt)); + if (queue->prepareDoneStatement() != GEARMAND_QUEUE_ERROR) { - if (queue->prepareDoneStatement() == GEARMAND_QUEUE_ERROR) - { - return GEARMAND_QUEUE_ERROR; - } continue; } - else - { - gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_bind_param failed: %s", mysql_error(queue->con)); - return GEARMAND_QUEUE_ERROR; - } + return GEARMAND_QUEUE_ERROR; } if (mysql_stmt_execute(queue->done_stmt)) { - if ( mysql_stmt_errno(queue->done_stmt) == CR_SERVER_LOST ) + gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_execute (%.*s %.*s) failed: [%d]%s", function_name_size, function_name, unique_size, unique, mysql_stmt_errno(queue->done_stmt), mysql_stmt_error(queue->done_stmt)); + if (queue->prepareDoneStatement() != GEARMAND_QUEUE_ERROR) { - mysql_stmt_close(queue->done_stmt); - if (queue->prepareDoneStatement() != GEARMAND_QUEUE_ERROR) - { - continue; - } + continue; } - gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_execute failed: %s", mysql_error(queue->con)); - return GEARMAND_QUEUE_ERROR; } --- libgearman-server/server.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-server/server.cc 2014-08-12 09:48:59.225350624 +0800 @@ -56,6 +56,7 @@ #include "libgearman-1.0/return.h" #include "libgearman-1.0/strerror.h" #include "libgearman/magic.h" +#include "libgearman/command.h" /* * Private declarations @@ -205,11 +206,13 @@ /* Client requests. */ case GEARMAN_COMMAND_SUBMIT_JOB: case GEARMAN_COMMAND_SUBMIT_JOB_BG: + case GEARMAN_COMMAND_SUBMIT_JOB_EPOCH: case GEARMAN_COMMAND_SUBMIT_JOB_HIGH: case GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG: + case GEARMAN_COMMAND_SUBMIT_JOB_HIGH_EPOCH: case GEARMAN_COMMAND_SUBMIT_JOB_LOW: case GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG: - case GEARMAN_COMMAND_SUBMIT_JOB_EPOCH: + case GEARMAN_COMMAND_SUBMIT_JOB_LOW_EPOCH: { gearman_job_priority_t priority; @@ -220,7 +223,8 @@ priority= GEARMAN_JOB_PRIORITY_NORMAL; } else if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH or - packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG) + packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG or + packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_EPOCH) { priority= GEARMAN_JOB_PRIORITY_HIGH; } @@ -232,7 +236,9 @@ if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB_BG or packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG or packet->command == GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG or - packet->command == GEARMAN_COMMAND_SUBMIT_JOB_EPOCH) + packet->command == GEARMAN_COMMAND_SUBMIT_JOB_EPOCH or + packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_EPOCH or + packet->command == GEARMAN_COMMAND_SUBMIT_JOB_LOW_EPOCH) { server_client= NULL; } @@ -251,7 +257,9 @@ packet->arg_size[1], packet->arg[1], (int)packet->argc); int64_t when= 0; - if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB_EPOCH) + if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB_EPOCH or + packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_EPOCH or + packet->command == GEARMAN_COMMAND_SUBMIT_JOB_LOW_EPOCH) { char *endptr; // @note stroll will set errno if error, but it might also leave errno @@ -303,15 +311,18 @@ } /* Queue the job created packet. */ - ret= gearman_server_io_packet_add(server_con, false, GEARMAN_MAGIC_RESPONSE, - GEARMAN_COMMAND_JOB_CREATED, - server_job->job_handle, - (size_t)strlen(server_job->job_handle), - NULL); - if (gearmand_failed(ret)) + if (server_con->protocol->is_response_once() == false or server_client == NULL) { - gearman_server_client_free(server_client); - return gearmand_gerror("gearman_server_io_packet_add", ret); + ret= gearman_server_io_packet_add(server_con, false, GEARMAN_MAGIC_RESPONSE, + GEARMAN_COMMAND_JOB_CREATED, + server_job->job_handle, + (size_t)strlen(server_job->job_handle), + NULL); + if (gearmand_failed(ret)) + { + gearman_server_client_free(server_client); + return gearmand_gerror("gearman_server_io_packet_add", ret); + } } gearmand_log_notice(GEARMAN_DEFAULT_LOG_PARAM,"accepted,%.*s,%.*s,%jd", @@ -1065,7 +1076,7 @@ if (gearmand_failed(ret)) { - gearmand_log_gerror_warn(GEARMAN_DEFAULT_LOG_PARAM, ret, "Failed to send WORK_FAIL packet to %s:%s", server_client->con->host(), server_client->con->port()); + gearmand_log_gerror_warn(GEARMAN_DEFAULT_LOG_PARAM, ret, "Failed to send %s packet to %s:%s", gearman_enum_strcommand(command), server_client->con->host(), server_client->con->port()); } } --- libgearman-server/struct/function.h 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-server/struct/function.h 2014-08-04 16:32:33.288752942 +0800 @@ -49,7 +49,16 @@ gearman_server_function_st *prev; char *function_name; gearman_server_worker_st *worker_list; + struct gearman_server_job_st *job_epoch_list[GEARMAN_JOB_PRIORITY_MAX]; + struct gearman_server_job_st *job_epoch_end[GEARMAN_JOB_PRIORITY_MAX]; + struct gearman_server_job_st *job_bg_list[GEARMAN_JOB_PRIORITY_MAX]; + struct gearman_server_job_st *job_bg_end[GEARMAN_JOB_PRIORITY_MAX]; struct gearman_server_job_st *job_list[GEARMAN_JOB_PRIORITY_MAX]; - gearman_server_job_st *job_end[GEARMAN_JOB_PRIORITY_MAX]; + struct gearman_server_job_st *job_end[GEARMAN_JOB_PRIORITY_MAX]; }; +struct gearman_server_epoch_function_st +{ + gearman_server_function_st *function; + gearman_server_epoch_function_st *next; +}; --- libgearman-server/struct/job.h 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-server/struct/job.h 2014-08-04 21:14:43.728737682 +0800 @@ -68,3 +68,10 @@ char unique[GEARMAN_MAX_UNIQUE_SIZE]; char reducer[GEARMAN_FUNCTION_MAX_SIZE]; }; + +struct gearman_server_epoch_job_st +{ + int64_t when; + gearman_server_epoch_job_st *next; + gearman_server_epoch_function_st *function_list; +}; --- libgearman-server/struct/server.h 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-server/struct/server.h 2014-08-04 20:17:17.312039668 +0800 @@ -80,6 +80,7 @@ } flags; struct State { bool queue_startup; + bool epoch_startup; } state; bool shutdown; bool shutdown_graceful; @@ -111,6 +112,7 @@ uint32_t hashtable_buckets; gearman_server_job_st **job_hash; gearman_server_job_st **unique_hash; + gearman_server_epoch_job_st *epoch_job_list; gearman_server_st() { --- libgearman-server/text.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-server/text.cc 2014-08-20 15:14:17.735145927 +0800 @@ -51,9 +51,70 @@ #define TEXT_ERROR_CREATE_FUNCTION "ERR CREATE_FUNCTION %.*s\r\n" #define TEXT_ERROR_UNKNOWN_COMMAND "ERR UNKNOWN_COMMAND Unknown+server+command%.*s\r\n" #define TEXT_ERROR_INTERNAL_ERROR "ERR UNKNOWN_ERROR\r\n" +#define TEXT_ERROR_UNKNOWN_CANCEL_ARGUMENTS "ERR UNKNOWN_CANCEL_ARGUMENTS\r\n" #define TEXT_ERROR_UNKNOWN_SHOW_ARGUMENTS "ERR UNKNOWN_SHOW_ARGUMENTS\r\n" #define TEXT_ERROR_UNKNOWN_JOB "ERR UNKNOWN_JOB\r\n" +static void cancel_server_job_unique(gearman_vector_st &data, gearman_server_job_st *server_job) +{ + gearmand_error_t ret= _gearman_server_job_cancel(server_job); + if (ret == GEARMAND_SUCCESS) + { + data.vec_append_printf("%.*s\t%s", server_job->unique_length, server_job->unique, TEXT_SUCCESS); + } + else + { + data.vec_append_printf("%.*s\t%s", server_job->unique_length, server_job->unique, TEXT_ERROR_INTERNAL_ERROR); + } +} + +static void cancel_server_job_handle(gearman_vector_st &data, gearman_server_job_st *server_job) +{ + gearmand_error_t ret= _gearman_server_job_cancel(server_job); + if (ret == GEARMAND_SUCCESS) + { + data.vec_append_printf("%s\t%s", server_job->job_handle, TEXT_SUCCESS); + } + else + { + data.vec_append_printf("%s\t%s", server_job->job_handle, TEXT_ERROR_INTERNAL_ERROR); + } +} + +static void print_server_job_unique(gearman_vector_st &data, gearman_server_job_st *server_job) +{ + char when[20] = "0000-00-00 00:00:00"; + if (server_job->when > 0) + { + struct tm t; + strftime(when, 20, "%Y-%m-%d %H:%M:%S", localtime_r(&(server_job->when), &t)); + } + data.vec_append_printf("%.*s\t%d\t%d\t%d\t%s\t%d\t%d\t%d\t%d\t%s\t%.*s\n", + server_job->unique_length, server_job->unique, + server_job->retries, server_job->ignore_job, server_job->job_queued, + when, server_job->priority, + (server_job->worker == NULL) ? 0 : 1, server_job->numerator, server_job->denominator, + server_job->job_handle, + server_job->function->function_name_size, server_job->function->function_name); +} + +static void print_server_job_handle(gearman_vector_st &data, gearman_server_job_st *server_job) +{ + char when[20] = "0000-00-00 00:00:00"; + if (server_job->when > 0) + { + struct tm t; + strftime(when, 20, "%Y-%m-%d %H:%M:%S", localtime_r(&(server_job->when), &t)); + } + data.vec_append_printf("%s\t%d\t%d\t%d\t%s\t%d\t%d\t%d\t%d\t%.*s\t%.*s\n", + server_job->job_handle, + server_job->retries, server_job->ignore_job, server_job->job_queued, + when, server_job->priority, + (server_job->worker == NULL) ? 0 : 1, server_job->numerator, server_job->denominator, + server_job->unique_length, server_job->unique, + server_job->function->function_name_size, server_job->function->function_name); +} + gearmand_error_t server_run_text(gearman_server_con_st *server_con, gearmand_packet_st *packet) { @@ -138,60 +199,357 @@ data.vec_append_printf(".\n"); } - else if (packet->argc >= 3 - and strcasecmp("cancel", (char *)(packet->arg[0])) == 0) + else if (strcasecmp("cancel", packet->arg[0]) == 0) { - if (packet->argc == 3 - and strcasecmp("job", (char *)(packet->arg[1])) == 0) + if (packet->argc == 4 and strcasecmp("unique", packet->arg[1]) == 0 and strcasecmp("job", packet->arg[2]) == 0) + { + gearmand_error_t ret= gearman_server_job_cancel_by_unique(Gearmand()->server, packet->arg[3], strlen(packet->arg[3])); + if (ret == GEARMAND_SUCCESS) + { + data.vec_printf(TEXT_SUCCESS); + } + else if (ret == GEARMAND_NO_JOBS) + { + data.vec_printf(TEXT_ERROR_UNKNOWN_JOB); + } + else + { + data.vec_printf(TEXT_ERROR_INTERNAL_ERROR); + } + } + else if (packet->argc == 3 and strcasecmp("job", packet->arg[1]) == 0) { gearmand_error_t ret= gearman_server_job_cancel(Gearmand()->server, packet->arg[2], strlen(packet->arg[2])); - if (ret == GEARMAND_SUCCESS) { data.vec_printf(TEXT_SUCCESS); } - else if (ret != GEARMAND_NO_JOBS) + else if (ret == GEARMAND_NO_JOBS) + { + data.vec_printf(TEXT_ERROR_UNKNOWN_JOB); + } + else { data.vec_printf(TEXT_ERROR_INTERNAL_ERROR); } + } + else if (packet->argc >= 3 and strcasecmp("unique", packet->arg[1]) == 0 and strcasecmp("jobs", packet->arg[2]) == 0) + { + if (packet->argc >= 4 and strcasecmp("-", packet->arg[3]) != 0) + { + char *args= packet->arg[3]; + int64_t size= strlen(packet->arg[3]); + while (size > 0) + { + char *comma= (char *)memchr(args, ',', size); + int64_t length= (comma == NULL) ? size : (comma -args); + if (length > 0) + { + gearman_server_job_st *server_job= gearman_server_job_get_by_unique(Server, args, length, NULL); + if (server_job != NULL) + { + cancel_server_job_unique(data, server_job); + } + else + { + data.vec_append_printf("%.*s\t%s", length, args, TEXT_ERROR_UNKNOWN_JOB); + } + } + args+= (length +1); + size-= (length +1); + } + } + else if (packet->argc >= 5 and strcasecmp("-", packet->arg[4]) != 0) + { + char *args= packet->arg[4]; + int64_t size= strlen(packet->arg[4]); + while (size > 0) + { + char *comma= (char *)memchr(args, ',', size); + int64_t length= (comma == NULL) ? size : (comma -args); + if (length > 0) + { + gearman_server_function_st *server_function= gearman_server_function_get(Server, args, length, false); + if (server_function != NULL and server_function->job_count > 0) + { + for (gearman_job_priority_t priority= GEARMAN_JOB_PRIORITY_HIGH; priority < GEARMAN_JOB_PRIORITY_MAX; priority= gearman_job_priority_t(int(priority) +1)) + { + for (gearman_server_job_st *server_job= server_function->job_list[priority]; server_job != NULL; server_job= server_job->function_next) + { + cancel_server_job_unique(data, server_job); + } + for (gearman_server_job_st *server_job= server_function->job_bg_list[priority]; server_job != NULL; server_job= server_job->function_next) + { + cancel_server_job_unique(data, server_job); + } + for (gearman_server_job_st *server_job= server_function->job_epoch_list[priority]; server_job != NULL; server_job= server_job->function_next) + { + cancel_server_job_unique(data, server_job); + } + } + } + } + args+= (length +1); + size-= (length +1); + } + } else { - data.vec_printf(TEXT_ERROR_UNKNOWN_JOB); + for (size_t x= 0; x < Server->hashtable_buckets; x++) + { + for (gearman_server_job_st* server_job= Server->unique_hash[x]; server_job != NULL; server_job= server_job->unique_next) + { + cancel_server_job_unique(data, server_job); + } + } } + data.vec_append_printf(".\n"); + } + else if (packet->argc >= 2 and strcasecmp("jobs", packet->arg[1]) == 0) + { + if (packet->argc >= 3 and strcasecmp("-", packet->arg[2]) != 0) + { + char *args= packet->arg[2]; + int64_t size= strlen(packet->arg[2]); + while (size > 0) + { + char *comma= (char *)memchr(args, ',', size); + int64_t length= (comma == NULL) ? size : (comma -args); + if (length > 0) + { + gearman_server_job_st *server_job= gearman_server_job_get(Server, args, length, NULL); + if (server_job != NULL) + { + cancel_server_job_handle(data, server_job); + } + else + { + data.vec_append_printf("%.*s\t%s", length, args, TEXT_ERROR_UNKNOWN_JOB); + } + } + args+= (length +1); + size-= (length +1); + } + } + else if (packet->argc >= 4 and strcasecmp("-", packet->arg[3]) != 0) + { + char *args= packet->arg[3]; + int64_t size= strlen(packet->arg[3]); + while (size > 0) + { + char *comma= (char *)memchr(args, ',', size); + int64_t length= (comma == NULL) ? size : (comma -args); + if (length > 0) + { + gearman_server_function_st *server_function= gearman_server_function_get(Server, args, length, false); + if (server_function != NULL and server_function->job_count > 0) + { + for (gearman_job_priority_t priority= GEARMAN_JOB_PRIORITY_HIGH; priority < GEARMAN_JOB_PRIORITY_MAX; priority= gearman_job_priority_t(int(priority) +1)) + { + for (gearman_server_job_st *server_job= server_function->job_list[priority]; server_job != NULL; server_job= server_job->function_next) + { + cancel_server_job_handle(data, server_job); + } + for (gearman_server_job_st *server_job= server_function->job_bg_list[priority]; server_job != NULL; server_job= server_job->function_next) + { + cancel_server_job_handle(data, server_job); + } + for (gearman_server_job_st *server_job= server_function->job_epoch_list[priority]; server_job != NULL; server_job= server_job->function_next) + { + cancel_server_job_handle(data, server_job); + } + } + } + } + args+= (length +1); + size-= (length +1); + } + } + else + { + for (size_t x= 0; x < Server->hashtable_buckets; ++x) + { + for (gearman_server_job_st *server_job= Server->job_hash[x]; server_job != NULL; server_job= server_job->next) + { + cancel_server_job_handle(data, server_job); + } + } + } + data.vec_append_printf(".\n"); + } + else + { + data.vec_printf(TEXT_ERROR_UNKNOWN_CANCEL_ARGUMENTS); } } - else if (packet->argc >= 2 and strcasecmp("show", (char *)(packet->arg[0])) == 0) + else if (strcasecmp("show", packet->arg[0]) == 0) { - if (packet->argc == 3 - and strcasecmp("unique", (char *)(packet->arg[1])) == 0 - and strcasecmp("jobs", (char *)(packet->arg[2])) == 0) + if (packet->argc == 4 and strcasecmp("unique", packet->arg[1]) == 0 and strcasecmp("job", packet->arg[2]) == 0) + { + gearman_server_job_st *server_job= gearman_server_job_get_by_unique(Server, packet->arg[3], strlen(packet->arg[3]), NULL); + if (server_job != NULL) + { + print_server_job_unique(data, server_job); + } + else + { + data.vec_printf(TEXT_ERROR_UNKNOWN_JOB); + } + } + else if (packet->argc == 3 and strcasecmp("job", packet->arg[1]) == 0) + { + gearman_server_job_st *server_job= gearman_server_job_get(Server, packet->arg[2], strlen(packet->arg[2]), NULL); + if (server_job != NULL) + { + print_server_job_handle(data, server_job); + } + else + { + data.vec_printf(TEXT_ERROR_UNKNOWN_JOB); + } + } + else if (packet->argc >= 3 and strcasecmp("unique", packet->arg[1]) == 0 and strcasecmp("jobs", packet->arg[2]) == 0) { - for (size_t x= 0; x < Server->hashtable_buckets; x++) + if (packet->argc >= 4 and strcasecmp("-", packet->arg[3]) != 0) { - for (gearman_server_job_st* server_job= Server->unique_hash[x]; - server_job != NULL; - server_job= server_job->unique_next) + char *args= packet->arg[3]; + int64_t size= strlen(packet->arg[3]); + while (size > 0) { - data.vec_append_printf("%.*s\n", int(server_job->unique_length), server_job->unique); + char *comma= (char *)memchr(args, ',', size); + int64_t length= (comma == NULL) ? size : (comma -args); + if (length > 0) + { + gearman_server_job_st *server_job= gearman_server_job_get_by_unique(Server, args, length, NULL); + if (server_job != NULL) + { + print_server_job_unique(data, server_job); + } + else + { + data.vec_append_printf("%.*s\t%s", length, args, TEXT_ERROR_UNKNOWN_JOB); + } + } + args+= (length +1); + size-= (length +1); + } + } + else if (packet->argc >= 5 and strcasecmp("-", packet->arg[4]) != 0) + { + char *args= packet->arg[4]; + int64_t size= strlen(packet->arg[4]); + while (size > 0) + { + char *comma= (char *)memchr(args, ',', size); + int64_t length= (comma == NULL) ? size : (comma -args); + if (length > 0) + { + gearman_server_function_st *server_function= gearman_server_function_get(Server, args, length, false); + if (server_function != NULL and server_function->job_count > 0) + { + for (gearman_job_priority_t priority= GEARMAN_JOB_PRIORITY_HIGH; priority < GEARMAN_JOB_PRIORITY_MAX; priority= gearman_job_priority_t(int(priority) +1)) + { + for (gearman_server_job_st *server_job= server_function->job_list[priority]; server_job != NULL; server_job= server_job->function_next) + { + print_server_job_unique(data, server_job); + } + for (gearman_server_job_st *server_job= server_function->job_bg_list[priority]; server_job != NULL; server_job= server_job->function_next) + { + print_server_job_unique(data, server_job); + } + for (gearman_server_job_st *server_job= server_function->job_epoch_list[priority]; server_job != NULL; server_job= server_job->function_next) + { + print_server_job_unique(data, server_job); + } + } + } + } + args+= (length +1); + size-= (length +1); + } + } + else + { + for (size_t x= 0; x < Server->hashtable_buckets; x++) + { + for (gearman_server_job_st* server_job= Server->unique_hash[x]; server_job != NULL; server_job= server_job->unique_next) + { + print_server_job_unique(data, server_job); + } } } - data.vec_append_printf(".\n"); } - else if (packet->argc == 2 - and strcasecmp("jobs", (char *)(packet->arg[1])) == 0) + else if (packet->argc >= 2 and strcasecmp("jobs", packet->arg[1]) == 0) { - for (size_t x= 0; x < Server->hashtable_buckets; ++x) + if (packet->argc >= 3 and strcasecmp("-", packet->arg[2]) != 0) { - for (gearman_server_job_st *server_job= Server->job_hash[x]; - server_job != NULL; - server_job= server_job->next) + char *args= packet->arg[2]; + int64_t size= strlen(packet->arg[2]); + while (size > 0) { - data.vec_append_printf("%s\t%u\t%u\t%u\n", server_job->job_handle, uint32_t(server_job->retries), - uint32_t(server_job->ignore_job), uint32_t(server_job->job_queued)); + char *comma= (char *)memchr(args, ',', size); + int64_t length= (comma == NULL) ? size : (comma -args); + if (length > 0) + { + gearman_server_job_st *server_job= gearman_server_job_get(Server, args, length, NULL); + if (server_job != NULL) + { + print_server_job_handle(data, server_job); + } + else + { + data.vec_append_printf("%.*s\t%s", length, args, TEXT_ERROR_UNKNOWN_JOB); + } + } + args+= (length +1); + size-= (length +1); + } + } + else if (packet->argc >= 4 and strcasecmp("-", packet->arg[3]) != 0) + { + char *args= packet->arg[3]; + int64_t size= strlen(packet->arg[3]); + while (size > 0) + { + char *comma= (char *)memchr(args, ',', size); + int64_t length= (comma == NULL) ? size : (comma -args); + if (length > 0) + { + gearman_server_function_st *server_function= gearman_server_function_get(Server, args, length, false); + if (server_function != NULL and server_function->job_count > 0) + { + for (gearman_job_priority_t priority= GEARMAN_JOB_PRIORITY_HIGH; priority < GEARMAN_JOB_PRIORITY_MAX; priority= gearman_job_priority_t(int(priority) +1)) + { + for (gearman_server_job_st *server_job= server_function->job_list[priority]; server_job != NULL; server_job= server_job->function_next) + { + print_server_job_handle(data, server_job); + } + for (gearman_server_job_st *server_job= server_function->job_bg_list[priority]; server_job != NULL; server_job= server_job->function_next) + { + print_server_job_handle(data, server_job); + } + for (gearman_server_job_st *server_job= server_function->job_epoch_list[priority]; server_job != NULL; server_job= server_job->function_next) + { + print_server_job_handle(data, server_job); + } + } + } + } + args+= (length +1); + size-= (length +1); + } + } + else + { + for (size_t x= 0; x < Server->hashtable_buckets; ++x) + { + for (gearman_server_job_st *server_job= Server->job_hash[x]; server_job != NULL; server_job= server_job->next) + { + print_server_job_handle(data, server_job); + } } } - data.vec_append_printf(".\n"); } else --- tests/libgearman-1.0/protocol.cc 2014-02-12 08:05:28.000000000 +0800 +++ tests/libgearman-1.0/protocol.cc 2014-08-02 20:22:29.445297135 +0800 @@ -92,6 +92,10 @@ ASSERT_EQ(38, int(GEARMAN_COMMAND_SUBMIT_REDUCE_JOB_BACKGROUND)); ASSERT_EQ(39, int(GEARMAN_COMMAND_GRAB_JOB_ALL)); ASSERT_EQ(40, int(GEARMAN_COMMAND_JOB_ASSIGN_ALL)); + ASSERT_EQ(41, int(GEARMAN_COMMAND_GET_STATUS_UNIQUE)); + ASSERT_EQ(42, int(GEARMAN_COMMAND_STATUS_RES_UNIQUE)); + ASSERT_EQ(43, int(GEARMAN_COMMAND_SUBMIT_JOB_HIGH_EPOCH)); + ASSERT_EQ(44, int(GEARMAN_COMMAND_SUBMIT_JOB_LOW_EPOCH)); return TEST_SUCCESS; } --- util/instance.cc 2014-02-12 08:05:28.000000000 +0800 +++ util/instance.cc 2014-08-03 23:22:25.045333811 +0800 @@ -125,7 +125,7 @@ SSL_load_error_strings(); SSL_library_init(); - if ((_ctx_ssl= SSL_CTX_new(TLSv1_client_method())) == NULL) + if ((_ctx_ssl= SSL_CTX_new(SSLv23_client_method())) == NULL) { _last_error= "SSL_CTX_new error"; return false; @@ -154,6 +154,14 @@ _last_error= message.str(); return false; } + + if (SSL_CTX_check_private_key(_ctx_ssl) != SSL_SUCCESS) + { + std::stringstream message; + message << "Error check private key."; + _last_error= message.str(); + return false; + } #endif // defined(HAVE_CYASSL) && HAVE_CYASSL return true; } @@ -273,6 +281,8 @@ _last_error= "SSL_set_fd() failed"; return false; } + + SSL_set_connect_state(_ssl); } #endif @@ -315,6 +325,10 @@ case SSL_ERROR_SSL: default: { + if (ERR_peek_last_error()) + { + ssl_error= ERR_peek_last_error(); + } char cyassl_error_buffer[SSL_ERROR_SIZE]= { 0 }; ERR_error_string_n(ssl_error, cyassl_error_buffer, sizeof(cyassl_error_buffer)); _last_error= cyassl_error_buffer; @@ -398,6 +412,10 @@ case SSL_ERROR_SSL: default: { + if (ERR_peek_last_error()) + { + ssl_error= ERR_peek_last_error(); + } char cyassl_error_buffer[SSL_ERROR_SIZE]= { 0 }; ERR_error_string_n(ssl_error, cyassl_error_buffer, sizeof(cyassl_error_buffer)); _last_error= cyassl_error_buffer;