--- libgearman/add.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman/add.cc 2014-08-03 01:52:11.415246532 +0800 @@ -63,6 +63,8 @@ switch (command) { case GEARMAN_COMMAND_SUBMIT_JOB_EPOCH: + case GEARMAN_COMMAND_SUBMIT_JOB_LOW_EPOCH: + case GEARMAN_COMMAND_SUBMIT_JOB_HIGH_EPOCH: case GEARMAN_COMMAND_SUBMIT_JOB_SCHED: case GEARMAN_COMMAND_SUBMIT_JOB_BG: case GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG: @@ -257,9 +259,12 @@ break; case GEARMAN_COMMAND_SUBMIT_JOB_EPOCH: + case GEARMAN_COMMAND_SUBMIT_JOB_LOW_EPOCH: + case GEARMAN_COMMAND_SUBMIT_JOB_HIGH_EPOCH: rc= libgearman::protocol::submit_epoch(task->client->universal, task->send, final_unique, + command, function, workload, when); --- libgearman/client.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman/client.cc 2014-08-03 17:35:02.875344990 +0800 @@ -546,6 +546,15 @@ } } +void gearman_client_set_ssl(gearman_client_st *client_shell, bool ssl, + const char *ca_file, const char *certificate, const char *key_file) +{ + if (client_shell && client_shell->impl()) + { + gearman_universal_set_ssl(client_shell->impl()->universal, ssl, ca_file, certificate, key_file); + } +} + void *gearman_client_context(const gearman_client_st *client_shell) { if (client_shell and client_shell->impl()) @@ -1217,6 +1226,96 @@ } +gearman_task_st *gearman_client_add_task_epoch(gearman_client_st *client, + gearman_task_st *task, + void *context, + const char *function, + const char *unique, + const void *workload, size_t workload_size, + time_t when, + gearman_return_t *ret_ptr) +{ + gearman_return_t unused; + if (ret_ptr == NULL) + { + ret_ptr= &unused; + } + + if (client == NULL or client->impl() == NULL) + { + *ret_ptr= GEARMAN_INVALID_ARGUMENT; + return NULL; + } + + return add_task_ptr(*(client->impl()), task, context, GEARMAN_COMMAND_SUBMIT_JOB_EPOCH, + function, + unique, + workload, workload_size, + when, + *ret_ptr, + client->impl()->actions); +} + +gearman_task_st *gearman_client_add_task_high_epoch(gearman_client_st *client, + gearman_task_st *task, + void *context, + const char *function, + const char *unique, + const void *workload, size_t workload_size, + time_t when, + gearman_return_t *ret_ptr) +{ + gearman_return_t unused; + if (ret_ptr == NULL) + { + ret_ptr= &unused; + } + + if (client == NULL or client->impl() == NULL) + { + *ret_ptr= GEARMAN_INVALID_ARGUMENT; + return NULL; + } + + return add_task_ptr(*(client->impl()), task, context, GEARMAN_COMMAND_SUBMIT_JOB_HIGH_EPOCH, + function, + unique, + workload, workload_size, + when, + *ret_ptr, + client->impl()->actions); +} + +gearman_task_st *gearman_client_add_task_low_epoch(gearman_client_st *client, + gearman_task_st *task, + void *context, + const char *function, + const char *unique, + const void *workload, size_t workload_size, + time_t when, + gearman_return_t *ret_ptr) +{ + gearman_return_t unused; + if (ret_ptr == NULL) + { + ret_ptr= &unused; + } + + if (client == NULL or client->impl() == NULL) + { + *ret_ptr= GEARMAN_INVALID_ARGUMENT; + return NULL; + } + + return add_task_ptr(*(client->impl()), task, context, GEARMAN_COMMAND_SUBMIT_JOB_LOW_EPOCH, + function, + unique, + workload, workload_size, + when, + *ret_ptr, + client->impl()->actions); +} + gearman_task_st *gearman_client_add_task_status(gearman_client_st *client_shell, gearman_task_st *task_shell, void *context, --- libgearman/client.hpp 2014-02-12 08:05:28.000000000 +0800 +++ libgearman/client.hpp 2014-07-07 14:23:34.249335984 +0800 @@ -121,6 +121,7 @@ void enable_ssl() { + return; if (getenv("GEARMAND_CA_CERTIFICATE")) { gearman_client_add_options(_client, GEARMAN_CLIENT_SSL); --- libgearman/command.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman/command.cc 2014-08-02 19:54:15.275309633 +0800 @@ -93,7 +93,9 @@ { "GEARMAN_GRAB_JOB_ALL", GEARMAN_COMMAND_GRAB_JOB_ALL, 0, false }, { "GEARMAN_JOB_ASSIGN_ALL", GEARMAN_COMMAND_JOB_ASSIGN_ALL, 4, true }, { "GEARMAN_GET_STATUS_UNIQUE", GEARMAN_COMMAND_GET_STATUS_UNIQUE, 1, false }, - { "GEARMAN_STATUS_RES_UNIQUE", GEARMAN_COMMAND_STATUS_RES_UNIQUE, 6, false } + { "GEARMAN_STATUS_RES_UNIQUE", GEARMAN_COMMAND_STATUS_RES_UNIQUE, 6, false }, + { "GEARMAN_SUBMIT_JOB_HIGH_EPOCH", GEARMAN_COMMAND_SUBMIT_JOB_HIGH_EPOCH, 3, true }, + { "GEARMAN_SUBMIT_JOB_LOW_EPOCH", GEARMAN_COMMAND_SUBMIT_JOB_LOW_EPOCH, 3, true } }; const char *gearman_strcommand(gearman_command_t command) --- libgearman/command.gperf 2014-02-12 08:05:28.000000000 +0800 +++ libgearman/command.gperf 2014-08-02 20:00:02.315307073 +0800 @@ -106,4 +106,6 @@ JOB_ASSIGN_ALL, GEARMAN_COMMAND_JOB_ASSIGN_ALL GET_STATUS_UNIQUE, GEARMAN_COMMAND_GET_STATUS_UNIQUE STATUS_RES_UNIQUE, GEARMAN_COMMAND_STATUS_RES_UNIQUE +SUBMIT_JOB_HIGH_EPOCH, GEARMAN_COMMAND_SUBMIT_JOB_HIGH_EPOCH +SUBMIT_JOB_LOW_EPOCH, GEARMAN_COMMAND_SUBMIT_JOB_LOW_EPOCH %% --- libgearman/command.hpp 2014-02-12 08:06:02.000000000 +0800 +++ libgearman/command.hpp 2014-08-02 20:06:36.115304168 +0800 @@ -85,12 +85,12 @@ }; #include -#define TOTAL_KEYWORDS 43 +#define TOTAL_KEYWORDS 45 #define MIN_WORD_LENGTH 4 #define MAX_WORD_LENGTH 28 #define MIN_HASH_VALUE 4 -#define MAX_HASH_VALUE 73 -/* maximum key range = 70, duplicates = 0 */ +#define MAX_HASH_VALUE 75 +/* maximum key range = 72, duplicates = 0 */ #ifndef GPERF_DOWNCASE #define GPERF_DOWNCASE 1 @@ -150,32 +150,32 @@ { static const unsigned char asso_values[] = { - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 35, 5, 25, 10, 25, - 74, 0, 25, 74, 5, 74, 10, 74, 0, 15, - 30, 40, 15, 10, 0, 0, 74, 0, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 35, 5, 25, - 10, 25, 74, 0, 25, 74, 5, 74, 10, 74, - 0, 15, 30, 40, 15, 10, 0, 0, 74, 0, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, - 74, 74, 74, 74, 74, 74 + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 25, 5, 25, 25, 25, + 76, 0, 10, 76, 10, 76, 5, 76, 45, 30, + 0, 20, 45, 10, 0, 0, 76, 0, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 25, 5, 25, + 25, 25, 76, 0, 10, 76, 10, 76, 5, 76, + 45, 30, 0, 20, 45, 10, 0, 0, 76, 0, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, + 76, 76, 76, 76, 76, 76 }; return len + asso_values[(unsigned char)str[len - 1]] + asso_values[(unsigned char)str[0]]; } @@ -184,34 +184,26 @@ { #line 66 "libgearman/command.gperf" {"TEXT", GEARMAN_COMMAND_TEXT}, -#line 76 "libgearman/command.gperf" - {"NO_JOB", GEARMAN_COMMAND_NO_JOB}, +#line 70 "libgearman/command.gperf" + {"PRE_SLEEP", GEARMAN_COMMAND_PRE_SLEEP}, #line 95 "libgearman/command.gperf" {"WORK_WARNING", GEARMAN_COMMAND_WORK_WARNING }, #line 75 "libgearman/command.gperf" {"GRAB_JOB", GEARMAN_COMMAND_GRAB_JOB}, -#line 91 "libgearman/command.gperf" - {"WORK_EXCEPTION", GEARMAN_COMMAND_WORK_EXCEPTION }, -#line 77 "libgearman/command.gperf" - {"JOB_ASSIGN", GEARMAN_COMMAND_JOB_ASSIGN }, -#line 71 "libgearman/command.gperf" - {"UNUSED", GEARMAN_COMMAND_UNUSED}, #line 80 "libgearman/command.gperf" {"WORK_FAIL",GEARMAN_COMMAND_WORK_FAIL}, +#line 105 "libgearman/command.gperf" + {"GRAB_JOB_ALL", GEARMAN_COMMAND_GRAB_JOB_ALL }, #line 81 "libgearman/command.gperf" {"GET_STATUS",GEARMAN_COMMAND_GET_STATUS}, #line 78 "libgearman/command.gperf" {"WORK_STATUS", GEARMAN_COMMAND_WORK_STATUS}, -#line 105 "libgearman/command.gperf" - {"GRAB_JOB_ALL", GEARMAN_COMMAND_GRAB_JOB_ALL }, #line 84 "libgearman/command.gperf" {"SUBMIT_JOB_BG", GEARMAN_COMMAND_SUBMIT_JOB_BG }, #line 99 "libgearman/command.gperf" {"SUBMIT_JOB_LOW", GEARMAN_COMMAND_SUBMIT_JOB_LOW }, #line 73 "libgearman/command.gperf" {"SUBMIT_JOB", GEARMAN_COMMAND_SUBMIT_JOB }, -#line 74 "libgearman/command.gperf" - {"JOB_CREATED", GEARMAN_COMMAND_JOB_CREATED}, #line 100 "libgearman/command.gperf" {"SUBMIT_JOB_LOW_BG", GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG }, #line 98 "libgearman/command.gperf" @@ -220,54 +212,66 @@ {"JOB_ASSIGN_ALL", GEARMAN_COMMAND_JOB_ASSIGN_ALL }, #line 86 "libgearman/command.gperf" {"STATUS_RES", GEARMAN_COMMAND_STATUS_RES}, +#line 71 "libgearman/command.gperf" + {"UNUSED", GEARMAN_COMMAND_UNUSED}, #line 103 "libgearman/command.gperf" {"SUBMIT_REDUCE_JOB", GEARMAN_COMMAND_SUBMIT_REDUCE_JOB}, -#line 88 "libgearman/command.gperf" - {"SET_CLIENT_ID", GEARMAN_COMMAND_SET_CLIENT_ID}, -#line 72 "libgearman/command.gperf" - {"NOOP", GEARMAN_COMMAND_NOOP}, -#line 93 "libgearman/command.gperf" - {"OPTION_RES", GEARMAN_COMMAND_OPTION_RES}, -#line 101 "libgearman/command.gperf" - {"SUBMIT_JOB_SCHED", GEARMAN_COMMAND_SUBMIT_JOB_SCHED }, +#line 96 "libgearman/command.gperf" + {"GRAB_JOB_UNIQ", GEARMAN_COMMAND_GRAB_JOB_UNIQ}, +#line 94 "libgearman/command.gperf" + {"WORK_DATA", GEARMAN_COMMAND_WORK_DATA }, +#line 87 "libgearman/command.gperf" + {"SUBMIT_JOB_HIGH", GEARMAN_COMMAND_SUBMIT_JOB_HIGH }, +#line 102 "libgearman/command.gperf" + {"SUBMIT_JOB_EPOCH", GEARMAN_COMMAND_SUBMIT_JOB_EPOCH }, #line 79 "libgearman/command.gperf" {"WORK_COMPLETE", GEARMAN_COMMAND_WORK_COMPLETE }, #line 89 "libgearman/command.gperf" {"CAN_DO_TIMEOUT", GEARMAN_COMMAND_CAN_DO_TIMEOUT}, -#line 69 "libgearman/command.gperf" - {"RESET_ABILITIES", GEARMAN_COMMAND_RESET_ABILITIES}, +#line 110 "libgearman/command.gperf" + {"SUBMIT_JOB_LOW_EPOCH", GEARMAN_COMMAND_SUBMIT_JOB_LOW_EPOCH }, +#line 109 "libgearman/command.gperf" + {"SUBMIT_JOB_HIGH_EPOCH", GEARMAN_COMMAND_SUBMIT_JOB_HIGH_EPOCH }, #line 107 "libgearman/command.gperf" {"GET_STATUS_UNIQUE", GEARMAN_COMMAND_GET_STATUS_UNIQUE}, #line 83 "libgearman/command.gperf" {"ECHO_RES", GEARMAN_COMMAND_ECHO_RES }, -#line 94 "libgearman/command.gperf" - {"WORK_DATA", GEARMAN_COMMAND_WORK_DATA }, -#line 85 "libgearman/command.gperf" - {"ERROR", GEARMAN_COMMAND_ERROR}, -#line 67 "libgearman/command.gperf" - {"CAN_DO", GEARMAN_COMMAND_CAN_DO}, -#line 68 "libgearman/command.gperf" - {"CANT_DO", GEARMAN_COMMAND_CANT_DO}, -#line 104 "libgearman/command.gperf" - {"SUBMIT_REDUCE_JOB_BACKGROUND", GEARMAN_COMMAND_SUBMIT_REDUCE_JOB_BACKGROUND}, -#line 87 "libgearman/command.gperf" - {"SUBMIT_JOB_HIGH", GEARMAN_COMMAND_SUBMIT_JOB_HIGH }, -#line 102 "libgearman/command.gperf" - {"SUBMIT_JOB_EPOCH", GEARMAN_COMMAND_SUBMIT_JOB_EPOCH }, -#line 108 "libgearman/command.gperf" - {"STATUS_RES_UNIQUE", GEARMAN_COMMAND_STATUS_RES_UNIQUE}, -#line 96 "libgearman/command.gperf" - {"GRAB_JOB_UNIQ", GEARMAN_COMMAND_GRAB_JOB_UNIQ}, #line 90 "libgearman/command.gperf" {"ALL_YOURS", GEARMAN_COMMAND_ALL_YOURS}, #line 97 "libgearman/command.gperf" {"JOB_ASSIGN_UNIQ", GEARMAN_COMMAND_JOB_ASSIGN_UNIQ }, +#line 74 "libgearman/command.gperf" + {"JOB_CREATED", GEARMAN_COMMAND_JOB_CREATED}, +#line 88 "libgearman/command.gperf" + {"SET_CLIENT_ID", GEARMAN_COMMAND_SET_CLIENT_ID}, +#line 72 "libgearman/command.gperf" + {"NOOP", GEARMAN_COMMAND_NOOP}, +#line 93 "libgearman/command.gperf" + {"OPTION_RES", GEARMAN_COMMAND_OPTION_RES}, +#line 101 "libgearman/command.gperf" + {"SUBMIT_JOB_SCHED", GEARMAN_COMMAND_SUBMIT_JOB_SCHED }, +#line 108 "libgearman/command.gperf" + {"STATUS_RES_UNIQUE", GEARMAN_COMMAND_STATUS_RES_UNIQUE}, +#line 82 "libgearman/command.gperf" + {"ECHO_REQ", GEARMAN_COMMAND_ECHO_REQ }, +#line 76 "libgearman/command.gperf" + {"NO_JOB", GEARMAN_COMMAND_NO_JOB}, +#line 91 "libgearman/command.gperf" + {"WORK_EXCEPTION", GEARMAN_COMMAND_WORK_EXCEPTION }, #line 92 "libgearman/command.gperf" {"OPTION_REQ", GEARMAN_COMMAND_OPTION_REQ}, -#line 70 "libgearman/command.gperf" - {"PRE_SLEEP", GEARMAN_COMMAND_PRE_SLEEP}, -#line 82 "libgearman/command.gperf" - {"ECHO_REQ", GEARMAN_COMMAND_ECHO_REQ } +#line 67 "libgearman/command.gperf" + {"CAN_DO", GEARMAN_COMMAND_CAN_DO}, +#line 68 "libgearman/command.gperf" + {"CANT_DO", GEARMAN_COMMAND_CANT_DO}, +#line 104 "libgearman/command.gperf" + {"SUBMIT_REDUCE_JOB_BACKGROUND", GEARMAN_COMMAND_SUBMIT_REDUCE_JOB_BACKGROUND}, +#line 77 "libgearman/command.gperf" + {"JOB_ASSIGN", GEARMAN_COMMAND_JOB_ASSIGN }, +#line 69 "libgearman/command.gperf" + {"RESET_ABILITIES", GEARMAN_COMMAND_RESET_ABILITIES}, +#line 85 "libgearman/command.gperf" + {"ERROR", GEARMAN_COMMAND_ERROR} }; const struct gearman_command_string_st * @@ -290,8 +294,8 @@ goto compare; } break; - case 7: - if (len == 6) + case 5: + if (len == 9) { resword = &gearman_command_string_st[1]; goto compare; @@ -312,278 +316,292 @@ } break; case 10: - if (len == 14) + if (len == 9) { resword = &gearman_command_string_st[4]; goto compare; } break; - case 11: - if (len == 10) + case 13: + if (len == 12) { resword = &gearman_command_string_st[5]; goto compare; } break; - case 12: - if (len == 6) + case 16: + if (len == 10) { resword = &gearman_command_string_st[6]; goto compare; } break; - case 15: - if (len == 9) + case 17: + if (len == 11) { resword = &gearman_command_string_st[7]; goto compare; } break; - case 16: - if (len == 10) + case 19: + if (len == 13) { resword = &gearman_command_string_st[8]; goto compare; } break; - case 17: - if (len == 11) + case 20: + if (len == 14) { resword = &gearman_command_string_st[9]; goto compare; } break; - case 18: - if (len == 12) + case 21: + if (len == 10) { resword = &gearman_command_string_st[10]; goto compare; } break; - case 19: - if (len == 13) + case 23: + if (len == 17) { resword = &gearman_command_string_st[11]; goto compare; } break; - case 20: - if (len == 14) + case 24: + if (len == 18) { resword = &gearman_command_string_st[12]; goto compare; } break; - case 21: - if (len == 10) + case 25: + if (len == 14) { resword = &gearman_command_string_st[13]; goto compare; } break; - case 22: - if (len == 11) + case 26: + if (len == 10) { resword = &gearman_command_string_st[14]; goto compare; } break; - case 23: - if (len == 17) + case 27: + if (len == 6) { resword = &gearman_command_string_st[15]; goto compare; } break; - case 24: - if (len == 18) + case 28: + if (len == 17) { resword = &gearman_command_string_st[16]; goto compare; } break; - case 25: - if (len == 14) + case 29: + if (len == 13) { resword = &gearman_command_string_st[17]; goto compare; } break; - case 26: - if (len == 10) + case 30: + if (len == 9) { resword = &gearman_command_string_st[18]; goto compare; } break; - case 28: - if (len == 17) + case 31: + if (len == 15) { resword = &gearman_command_string_st[19]; goto compare; } break; - case 29: - if (len == 13) + case 32: + if (len == 16) { resword = &gearman_command_string_st[20]; goto compare; } break; - case 30: - if (len == 4) + case 34: + if (len == 13) { resword = &gearman_command_string_st[21]; goto compare; } break; - case 31: - if (len == 10) + case 35: + if (len == 14) { resword = &gearman_command_string_st[22]; goto compare; } break; - case 32: - if (len == 16) + case 36: + if (len == 20) { resword = &gearman_command_string_st[23]; goto compare; } break; - case 34: - if (len == 13) + case 37: + if (len == 21) { resword = &gearman_command_string_st[24]; goto compare; } break; - case 35: - if (len == 14) + case 38: + if (len == 17) { resword = &gearman_command_string_st[25]; goto compare; } break; - case 36: - if (len == 15) + case 39: + if (len == 8) { resword = &gearman_command_string_st[26]; goto compare; } break; - case 38: - if (len == 17) + case 40: + if (len == 9) { resword = &gearman_command_string_st[27]; goto compare; } break; - case 39: - if (len == 8) + case 41: + if (len == 15) { resword = &gearman_command_string_st[28]; goto compare; } break; - case 40: - if (len == 9) + case 42: + if (len == 11) { resword = &gearman_command_string_st[29]; goto compare; } break; - case 41: - if (len == 5) + case 44: + if (len == 13) { resword = &gearman_command_string_st[30]; goto compare; } break; - case 42: - if (len == 6) + case 45: + if (len == 4) { resword = &gearman_command_string_st[31]; goto compare; } break; - case 43: - if (len == 7) + case 46: + if (len == 10) { resword = &gearman_command_string_st[32]; goto compare; } break; - case 44: - if (len == 28) + case 47: + if (len == 16) { resword = &gearman_command_string_st[33]; goto compare; } break; - case 46: - if (len == 15) + case 48: + if (len == 17) { resword = &gearman_command_string_st[34]; goto compare; } break; - case 47: - if (len == 16) + case 49: + if (len == 8) { resword = &gearman_command_string_st[35]; goto compare; } break; - case 48: - if (len == 17) + case 52: + if (len == 6) { resword = &gearman_command_string_st[36]; goto compare; } break; - case 49: - if (len == 13) + case 55: + if (len == 14) { resword = &gearman_command_string_st[37]; goto compare; } break; - case 50: - if (len == 9) + case 56: + if (len == 10) { resword = &gearman_command_string_st[38]; goto compare; } break; - case 56: - if (len == 15) + case 57: + if (len == 6) { resword = &gearman_command_string_st[39]; goto compare; } break; - case 61: - if (len == 10) + case 58: + if (len == 7) { resword = &gearman_command_string_st[40]; goto compare; } break; - case 65: - if (len == 9) + case 59: + if (len == 28) { resword = &gearman_command_string_st[41]; goto compare; } break; - case 69: - if (len == 8) + case 61: + if (len == 10) { resword = &gearman_command_string_st[42]; goto compare; } break; + case 66: + if (len == 15) + { + resword = &gearman_command_string_st[43]; + goto compare; + } + break; + case 71: + if (len == 5) + { + resword = &gearman_command_string_st[44]; + goto compare; + } + break; } return 0; compare: @@ -597,5 +615,5 @@ } return 0; } -#line 109 "libgearman/command.gperf" +#line 111 "libgearman/command.gperf" --- libgearman/connection.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman/connection.cc 2014-08-03 23:19:28.255335115 +0800 @@ -678,6 +678,8 @@ ERR_error_string_n(SSL_get_error(_ssl, 0), errorString, sizeof(errorString)); return gearman_error(universal, GEARMAN_COULD_NOT_CONNECT, errorString); } + + SSL_set_connect_state(_ssl); } #endif @@ -858,7 +860,11 @@ case SSL_ERROR_SSL: default: { - char errorString[80]; + if (ERR_peek_last_error()) + { + ssl_error= ERR_peek_last_error(); + } + char errorString[SSL_ERROR_SIZE]; ERR_error_string_n(ssl_error, errorString, sizeof(errorString)); close_socket(); return gearman_universal_set_error(universal, GEARMAN_LOST_CONNECTION, GEARMAN_AT, "SSL failure(%s)", errorString); @@ -1163,7 +1169,11 @@ case SSL_ERROR_SSL: default: { - char errorString[80]; + if (ERR_peek_last_error()) + { + ssl_error= ERR_peek_last_error(); + } + char errorString[SSL_ERROR_SIZE]; ERR_error_string_n(ssl_error, errorString, sizeof(errorString)); close_socket(); return gearman_universal_set_error(universal, GEARMAN_LOST_CONNECTION, GEARMAN_AT, "SSL failure(%s)", errorString); --- libgearman/execute.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman/execute.cc 2014-08-02 20:27:11.055295057 +0800 @@ -68,6 +68,20 @@ return GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG; } +static inline gearman_command_t pick_command_by_priority_epoch(const gearman_job_priority_t &arg) +{ + if (arg == GEARMAN_JOB_PRIORITY_NORMAL) + { + return GEARMAN_COMMAND_SUBMIT_JOB_EPOCH; + } + else if (arg == GEARMAN_JOB_PRIORITY_HIGH) + { + return GEARMAN_COMMAND_SUBMIT_JOB_HIGH_EPOCH; + } + + return GEARMAN_COMMAND_SUBMIT_JOB_LOW_EPOCH; +} + gearman_task_st *gearman_execute(gearman_client_st *client_shell, @@ -117,7 +131,7 @@ case GEARMAN_TASK_ATTR_EPOCH: task= add_task(*client, context, - GEARMAN_COMMAND_SUBMIT_JOB_EPOCH, + pick_command_by_priority_epoch(task_attr->priority), function, unique, arguments->value, --- libgearman/interface/universal.hpp 2014-02-12 08:05:28.000000000 +0800 +++ libgearman/interface/universal.hpp 2014-08-03 23:20:15.535334766 +0800 @@ -68,12 +68,18 @@ bool non_blocking; bool no_new_data; bool _ssl; + struct gearman_vector_st *_ssl_ca_file; + struct gearman_vector_st *_ssl_certificate; + struct gearman_vector_st *_ssl_key; Options() : dont_track_packets(false), non_blocking(false), no_new_data(false), - _ssl(false) + _ssl(false), + _ssl_ca_file(NULL), + _ssl_certificate(NULL), + _ssl_key(NULL) { } } options; gearman_verbose_t verbose; @@ -208,6 +214,11 @@ const char* ssl_ca_file() const { + if (options._ssl_ca_file && options._ssl_ca_file->size()) + { + return options._ssl_ca_file->c_str(); + } + if (getenv("GEARMAND_CA_CERTIFICATE")) { return getenv("GEARMAND_CA_CERTIFICATE"); @@ -216,8 +227,27 @@ return GEARMAND_CA_CERTIFICATE; } + void ssl_ca_file(const char* ssl_ca_file_) + { + gearman_string_free(options._ssl_ca_file); + size_t ssl_ca_file_size_= 0; + if (ssl_ca_file_ && (ssl_ca_file_size_= strlen(ssl_ca_file_))) + { + options._ssl_ca_file= gearman_string_create(NULL, ssl_ca_file_, ssl_ca_file_size_); + } + else + { + options._ssl_ca_file= NULL; + } + } + const char* ssl_certificate() const { + if (options._ssl_certificate && options._ssl_certificate->size()) + { + return options._ssl_certificate->c_str(); + } + if (getenv("GEARMAN_CLIENT_PEM")) { return getenv("GEARMAN_CLIENT_PEM"); @@ -226,8 +256,27 @@ return GEARMAN_CLIENT_PEM; } + void ssl_certificate(const char *ssl_certificate_) + { + gearman_string_free(options._ssl_certificate); + size_t ssl_certificate_size_= 0; + if (ssl_certificate_ && (ssl_certificate_size_= strlen(ssl_certificate_))) + { + options._ssl_certificate= gearman_string_create(NULL, ssl_certificate_, ssl_certificate_size_); + } + else + { + options._ssl_certificate= NULL; + } + } + const char* ssl_key() const { + if (options._ssl_key && options._ssl_key->size()) + { + return options._ssl_key->c_str(); + } + if (getenv("GEARMAN_CLIENT_KEY")) { return getenv("GEARMAN_CLIENT_KEY"); @@ -236,6 +285,20 @@ return GEARMAN_CLIENT_KEY; } + void ssl_key(const char *ssl_key_) + { + gearman_string_free(options._ssl_key); + size_t ssl_key_size_= 0; + if (ssl_key_ && (ssl_key_size_= strlen(ssl_key_))) + { + options._ssl_key= gearman_string_create(NULL, ssl_key_, ssl_key_size_); + } + else + { + options._ssl_key= NULL; + } + } + private: bool init_ssl(); --- libgearman/protocol/submit.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman/protocol/submit.cc 2014-08-02 19:43:57.085314194 +0800 @@ -113,6 +113,7 @@ gearman_return_t submit_epoch(gearman_universal_st& universal, gearman_packet_st& message, const gearman_unique_t& unique, + const gearman_command_t command, const gearman_string_t &function, const gearman_string_t &workload, time_t when) @@ -155,7 +156,7 @@ return gearman_packet_create_args(universal, message, GEARMAN_MAGIC_REQUEST, - GEARMAN_COMMAND_SUBMIT_JOB_EPOCH, + command, args, args_size, 4); } --- libgearman/protocol/submit.h 2014-02-12 08:05:28.000000000 +0800 +++ libgearman/protocol/submit.h 2014-08-03 03:32:16.555202229 +0800 @@ -58,6 +58,7 @@ gearman_return_t submit_epoch(gearman_universal_st&, gearman_packet_st& message, const gearman_unique_t& unique, + const gearman_command_t command, const gearman_string_t &function, const gearman_string_t &workload, time_t when); --- libgearman/run.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman/run.cc 2014-08-02 20:13:22.835301167 +0800 @@ -199,6 +199,8 @@ task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG || task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG || task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_EPOCH || + task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_EPOCH || + task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_LOW_EPOCH || task->send.command == GEARMAN_COMMAND_SUBMIT_REDUCE_JOB_BACKGROUND) { task->error_code(GEARMAN_SUCCESS); --- libgearman/universal.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman/universal.cc 2014-08-03 23:20:50.125334511 +0800 @@ -183,6 +183,15 @@ self.timeout= timeout; } +void gearman_universal_set_ssl(gearman_universal_st &self, bool ssl, + const char *ca_file, const char *certificate, const char *key_file) +{ + self.ssl(ssl); + self.ssl_ca_file(ca_file); + self.ssl_certificate(certificate); + self.ssl_key(key_file); +} + void gearman_set_log_fn(gearman_universal_st &self, gearman_log_fn *function, void *context, gearman_verbose_t verbose) { @@ -470,10 +479,31 @@ if (ssl()) { #if defined(HAVE_SSL) && HAVE_SSL + // Check these files exist or not to avoid coredump. + FILE *file= NULL; + if ((file= fopen(ssl_ca_file(), "r")) == NULL) + { + gearman_universal_set_error(*this, GEARMAN_INVALID_ARGUMENT, GEARMAN_AT, "Failed to open CA certificate %s (%d: %s)", ssl_ca_file(), errno, strerror(errno)); + return false; + } + fclose(file); + if ((file= fopen(ssl_certificate(), "r")) == NULL) + { + gearman_universal_set_error(*this, GEARMAN_INVALID_ARGUMENT, GEARMAN_AT, "Failed to open certificate %s (%d: %s)", ssl_certificate(), errno, strerror(errno)); + return false; + } + fclose(file); + if ((file= fopen(ssl_key(), "r")) == NULL) + { + gearman_universal_set_error(*this, GEARMAN_INVALID_ARGUMENT, GEARMAN_AT, "Failed to open certificate key %s (%d: %s)", ssl_key(), errno, strerror(errno)); + return false; + } + fclose(file); + SSL_load_error_strings(); SSL_library_init(); - if ((_ctx_ssl= SSL_CTX_new(TLSv1_client_method())) == NULL) + if ((_ctx_ssl= SSL_CTX_new(SSLv23_client_method())) == NULL) { gearman_universal_set_error(*this, GEARMAN_INVALID_ARGUMENT, GEARMAN_AT, "CyaTLSv1_client_method() failed"); return false; @@ -496,6 +526,12 @@ gearman_universal_set_error(*this, GEARMAN_INVALID_ARGUMENT, GEARMAN_AT, "Failed to load certificate key %s", ssl_key()); return false; } + + if (SSL_CTX_check_private_key(_ctx_ssl) != SSL_SUCCESS) + { + gearman_universal_set_error(*this, GEARMAN_INVALID_ARGUMENT, GEARMAN_AT, "Failed to check private key"); + return false; + } #endif // defined(HAVE_SSL) && HAVE_SSL } --- libgearman/universal.hpp 2014-02-12 08:05:28.000000000 +0800 +++ libgearman/universal.hpp 2014-07-14 15:41:04.485239304 +0800 @@ -57,6 +57,9 @@ int gearman_universal_timeout(gearman_universal_st &self); +void gearman_universal_set_ssl(gearman_universal_st &self, bool ssl, + const char *ca_file, const char *certificate, const char *key_file); + void gearman_universal_set_namespace(gearman_universal_st &self, const char *namespace_key, size_t namespace_key_size); gearman_return_t cancel_job(gearman_universal_st& universal, --- libgearman/worker.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman/worker.cc 2014-07-15 00:43:21.945244238 +0800 @@ -415,6 +415,15 @@ } } +void gearman_worker_set_ssl(gearman_worker_st *worker_shell, bool ssl, + const char *ca_file, const char *certificate, const char *key_file) +{ + if (worker_shell && worker_shell->impl()) + { + gearman_universal_set_ssl(worker_shell->impl()->universal, ssl, ca_file, certificate, key_file); + } +} + void *gearman_worker_context(const gearman_worker_st *worker) { if (worker and worker->impl()) --- libgearman/worker.hpp 2014-02-12 08:05:28.000000000 +0800 +++ libgearman/worker.hpp 2014-07-07 14:23:58.669334929 +0800 @@ -134,6 +134,7 @@ void enable_ssl() { + return; if (getenv("GEARMAND_CA_CERTIFICATE")) { gearman_worker_add_options(_worker, GEARMAN_WORKER_SSL); --- libgearman-1.0/client.h 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-1.0/client.h 2014-08-03 17:34:07.805345396 +0800 @@ -172,6 +172,13 @@ void gearman_client_set_timeout(gearman_client_st *client, int timeout); /** + * See gearman_universal_set_ssl() for details. + */ +GEARMAN_API +void gearman_client_set_ssl(gearman_client_st *client, bool ssl, + const char *ca_file, const char *certificate, const char *key_file); + +/** * Get the application context for a client. * * @param[in] client Structure previously initialized with @@ -570,6 +577,54 @@ gearman_return_t *ret_ptr); /** + * Add an epoch task to be run in parallel. See + * gearman_client_add_task() for details. + * @param[in] when The time when to run. + */ +GEARMAN_API +gearman_task_st *gearman_client_add_task_epoch(gearman_client_st *client, + gearman_task_st *task, + void *context, + const char *function_name, + const char *unique, + const void *workload, + size_t workload_size, + time_t when, + gearman_return_t *ret_ptr); + +/** + * Add an epoch task to be run in parallel. See + * gearman_client_add_task() for details. + * @param[in] when The time when to run. + */ +GEARMAN_API +gearman_task_st *gearman_client_add_task_high_epoch(gearman_client_st *client, + gearman_task_st *task, + void *context, + const char *function_name, + const char *unique, + const void *workload, + size_t workload_size, + time_t when, + gearman_return_t *ret_ptr); + +/** + * Add an epoch task to be run in parallel. See + * gearman_client_add_task() for details. + * @param[in] when The time when to run. + */ +GEARMAN_API +gearman_task_st *gearman_client_add_task_low_epoch(gearman_client_st *client, + gearman_task_st *task, + void *context, + const char *function_name, + const char *unique, + const void *workload, + size_t workload_size, + time_t when, + gearman_return_t *ret_ptr); + +/** * Add task to get the status for a backgound task in parallel. * * @param[in] client Structure previously initialized with --- libgearman-1.0/protocol.h 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-1.0/protocol.h 2014-08-02 19:19:49.235324876 +0800 @@ -87,6 +87,8 @@ GEARMAN_COMMAND_JOB_ASSIGN_ALL, /* J->W: HANDLE[0]FUNC[0]UNIQ[0]REDUCER[0]ARGS */ GEARMAN_COMMAND_GET_STATUS_UNIQUE, /* C->J: UNIQUE */ GEARMAN_COMMAND_STATUS_RES_UNIQUE, /* J->C: UNIQUE[0]KNOWN[0]RUNNING[0]NUM[0]DENOM[0]CLIENT_COUNT */ + GEARMAN_COMMAND_SUBMIT_JOB_HIGH_EPOCH, + GEARMAN_COMMAND_SUBMIT_JOB_LOW_EPOCH, GEARMAN_COMMAND_MAX /* Always add new commands before this. */ }; --- libgearman-1.0/worker.h 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-1.0/worker.h 2014-07-15 00:41:40.291909980 +0800 @@ -180,6 +180,13 @@ void gearman_worker_set_timeout(gearman_worker_st *worker, int timeout); /** + * See gearman_universal_set_ssl() for details. + */ +GEARMAN_API +void gearman_worker_set_ssl(gearman_worker_st *worker, bool ssl, + const char *ca_file, const char *certificate, const char *key_file); + +/** * Get the application context for a worker. * * @param[in] worker Structure previously initialized with --- libgearman-server/function.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-server/function.cc 2014-08-02 19:13:05.415327855 +0800 @@ -105,6 +105,14 @@ function->function_name[function_name_size]= 0; function->function_name_size= function_name_size; function->worker_list= NULL; + memset(function->job_epoch_list, 0, + sizeof(gearman_server_job_st *) * GEARMAN_JOB_PRIORITY_MAX); + memset(function->job_epoch_end, 0, + sizeof(gearman_server_job_st *) * GEARMAN_JOB_PRIORITY_MAX); + memset(function->job_bg_list, 0, + sizeof(gearman_server_job_st *) * GEARMAN_JOB_PRIORITY_MAX); + memset(function->job_bg_end, 0, + sizeof(gearman_server_job_st *) * GEARMAN_JOB_PRIORITY_MAX); memset(function->job_list, 0, sizeof(gearman_server_job_st *) * GEARMAN_JOB_PRIORITY_MAX); memset(function->job_end, 0, --- libgearman-server/gearmand.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-server/gearmand.cc 2014-08-04 21:21:24.478741328 +0800 @@ -175,6 +175,13 @@ delete worker; } + gearman_server_epoch_job_st *epoch_job= NULL; + while ((epoch_job= server.epoch_job_list)) + { + server.epoch_job_list= epoch_job->next; + gearman_server_epoch_job_free(epoch_job); + } + gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "removing queue: %s", (server.queue_version == QUEUE_VERSION_CLASS) ? "CLASS" : "FUNCTION"); if (server.queue_version == QUEUE_VERSION_CLASS) { @@ -1202,6 +1209,7 @@ uint32_t hashtable_buckets) { server.state.queue_startup= false; + server.state.epoch_startup= false; server.flags.round_robin= round_robin_arg; server.flags.threaded= false; server.shutdown= false; @@ -1223,6 +1231,7 @@ server.free_job_list= NULL; server.free_client_list= NULL; server.free_worker_list= NULL; + server.epoch_job_list= NULL; server.queue_version= QUEUE_VERSION_NONE; server.queue.object= NULL; --- libgearman-server/gearmand_con.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-server/gearmand_con.cc 2014-08-03 05:55:15.715138936 +0800 @@ -315,35 +315,36 @@ server_worker != NULL; server_worker= server_worker->con_next) { - if (server_worker->function->job_count != 0) + if (server_worker->function and server_worker->function->job_count != 0) { for (gearman_job_priority_t priority= GEARMAN_JOB_PRIORITY_HIGH; priority != GEARMAN_JOB_PRIORITY_MAX; priority= gearman_job_priority_t(int(priority) +1)) { - gearman_server_job_st *server_job; - server_job= server_worker->function->job_list[priority]; - int64_t current_time= (int64_t)time(NULL); - - while(server_job && - server_job->when != 0 && - server_job->when > current_time) + gearman_server_job_st *server_job= NULL; + if (server_job == NULL) { - server_job= server_job->function_next; + server_job= server_worker->function->job_epoch_list[priority]; + server_job and ((server_job->when <= current_time) or (server_job= NULL)); } - - if (server_job != NULL) + if (server_job == NULL) { + server_job= server_worker->function->job_list[priority]; + } + if (server_job == NULL) + { + server_job= server_worker->function->job_bg_list[priority]; + } + if (server_job != NULL) + { if (server_job->ignore_job) { /* This is only happens when a client disconnects from a foreground job. We do this because we don't want to run the job anymore. */ - server_job->ignore_job= false; - - gearman_server_job_free(gearman_server_job_take(server_con)); - + server_job->function->job_count--; + gearman_server_job_free(server_job); return gearman_server_job_peek(server_con); } @@ -352,7 +353,6 @@ } } } - return NULL; } @@ -378,47 +378,50 @@ } } + int64_t current_time= (int64_t)time(NULL); + gearman_server_job_st *server_job= NULL; gearman_job_priority_t priority; - for (priority= GEARMAN_JOB_PRIORITY_HIGH; priority < GEARMAN_JOB_PRIORITY_LOW; - priority= gearman_job_priority_t(int(priority) +1)) + if (server_job == NULL) { - if (server_worker->function->job_list[priority]) + for (priority= GEARMAN_JOB_PRIORITY_HIGH; priority < GEARMAN_JOB_PRIORITY_MAX; + priority= gearman_job_priority_t(int(priority) +1)) { - break; + if ((server_job= server_worker->function->job_epoch_list[priority]) and + ((server_job->when <= current_time) or (server_job= NULL))) + { + server_job->function->job_epoch_list[priority]= server_job->function_next; + break; + } } } - - gearman_server_job_st *server_job= server_worker->function->job_list[priority]; - gearman_server_job_st *previous_job= server_job; - - int64_t current_time= (int64_t)time(NULL); - - while (server_job and server_job->when != 0 and server_job->when > current_time) + if (server_job == NULL) { - previous_job= server_job; - server_job= server_job->function_next; - } - - if (server_job) - { - if (server_job->function->job_list[priority] == server_job) - { - // If it's the head of the list, advance it - server_job->function->job_list[priority]= server_job->function_next; - } - else + for (priority= GEARMAN_JOB_PRIORITY_HIGH; priority < GEARMAN_JOB_PRIORITY_MAX; + priority= gearman_job_priority_t(int(priority) +1)) { - // Otherwise, just remove the item from the list - previous_job->function_next= server_job->function_next; + if ((server_job= server_worker->function->job_list[priority])) + { + server_job->function->job_list[priority]= server_job->function_next; + break; + } } - - // If it's the tail of the list, move the tail back - if (server_job->function->job_end[priority] == server_job) + } + if (server_job == NULL) + { + for (priority= GEARMAN_JOB_PRIORITY_HIGH; priority < GEARMAN_JOB_PRIORITY_MAX; + priority= gearman_job_priority_t(int(priority) +1)) { - server_job->function->job_end[priority]= previous_job; + if ((server_job= server_worker->function->job_bg_list[priority])) + { + server_job->function->job_bg_list[priority]= server_job->function_next; + break; + } } - server_job->function->job_count--; + } + if (server_job) + { + server_job->function->job_count--; server_job->worker= server_worker; GEARMAND_LIST_ADD(server_worker->job, server_job, worker_); server_job->function->job_running++; --- libgearman-server/io.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-server/io.cc 2014-08-03 23:21:07.545334383 +0800 @@ -159,6 +159,10 @@ case SSL_ERROR_SSL: default: { // All other errors + if (ERR_peek_last_error()) + { + ssl_error= ERR_peek_last_error(); + } char errorString[SSL_ERROR_SIZE]; ERR_error_string_n(ssl_error, errorString, sizeof(errorString)); ret= GEARMAND_LOST_CONNECTION; @@ -338,6 +342,10 @@ case SSL_ERROR_SSL: default: { + if (ERR_peek_last_error()) + { + ssl_error= ERR_peek_last_error(); + } char errorString[SSL_ERROR_SIZE]; ERR_error_string_n(ssl_error, errorString, sizeof(errorString)); _connection_close(connection); --- libgearman-server/job.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-server/job.cc 2014-08-04 21:14:47.032071046 +0800 @@ -46,6 +46,7 @@ #include "gear_config.h" #include "libgearman-server/common.h" #include +#include #include @@ -339,7 +340,9 @@ gearmand_error_t gearman_server_job_queue(gearman_server_job_st *job) { - if (job->worker) + int64_t current_time= time(NULL); + + if (job->when <= current_time and job->worker) { job->retries++; if (Server->job_retries != 0 && Server->job_retries == job->retries) @@ -389,7 +392,7 @@ } /* Queue NOOP for possible sleeping workers. */ - if (job->function->worker_list != NULL) + if (job->when <= current_time and job->function->worker_list != NULL) { gearman_server_worker_st *worker= job->function->worker_list; uint32_t noop_sent= 0; @@ -422,18 +425,163 @@ } /* Queue the job to be run. */ - if (job->function->job_list[job->priority] == NULL) + if (job->job_queued == false) { - job->function->job_list[job->priority]= job; + if (job->function->job_list[job->priority] == NULL) + { + job->function->job_list[job->priority]= job; + } + else + { + job->function->job_end[job->priority]->function_next= job; + } + job->function->job_end[job->priority]= job; + } + else if (job->when == 0) + { + if (job->function->job_bg_list[job->priority] == NULL) + { + job->function->job_bg_list[job->priority]= job; + } + else + { + job->function->job_bg_end[job->priority]->function_next= job; + } + job->function->job_bg_end[job->priority]= job; } else { - job->function->job_end[job->priority]->function_next= job; + gearman_server_job_st *job0= NULL, *job1= job->function->job_epoch_list[job->priority]; + for (; job1 and job1->when <= job->when; job0= job1, job1= job1->function_next); + if (job0 == NULL) + { + job->function->job_epoch_list[job->priority]= job; + } + else + { + job0->function_next= job; + } + if (job1 == NULL) + { + job->function->job_epoch_end[job->priority]= job; + } + else + { + job->function_next= job1; + } + + if (job->when > current_time) + { + gearman_server_epoch_job_push(job->when, job->function); + } } - job->function->job_end[job->priority]= job; job->function->job_count++; return GEARMAND_SUCCESS; } #pragma GCC diagnostic pop + +void gearman_server_epoch_job_free(gearman_server_epoch_job_st *epoch_job) +{ + if (epoch_job == NULL) + { + return; + } + gearman_server_epoch_function_st *epoch_function= NULL; + while ((epoch_function= epoch_job->function_list)) + { + epoch_job->function_list= epoch_function->next; + delete epoch_function; + } + delete epoch_job; +} + +void gearman_server_epoch_job_push(int64_t when, gearman_server_function_st *function) +{ + gearman_server_st *server= gearmand_server(Gearmand()); + if (!server->state.epoch_startup) + { + signal(SIGALRM, gearman_server_epoch_job_pop); + server->state.epoch_startup= true; + } + + gearman_server_epoch_job_st *epoch_job= NULL, *epoch_job0= NULL, *epoch_job1= server->epoch_job_list; + for (; epoch_job1 and epoch_job1->when <= when; epoch_job0= epoch_job1, epoch_job1= epoch_job1->next); + if (epoch_job0 == NULL or epoch_job0->when < when) + { + epoch_job= new gearman_server_epoch_job_st; + epoch_job->when= when; + epoch_job->next= epoch_job1; + epoch_job->function_list= NULL; + if (epoch_job0 == NULL) + { + server->epoch_job_list= epoch_job; + alarm(when - time(NULL)); + } + else + { + epoch_job0->next= epoch_job; + } + } + else + { + epoch_job= epoch_job0; + } + + gearman_server_epoch_function_st *epoch_function= NULL, *epoch_function1= epoch_job->function_list; + for (; epoch_function1 and epoch_function1->function != function; epoch_function1= epoch_function1->next); + if (epoch_function1 == NULL) + { + epoch_function= new gearman_server_epoch_function_st; + epoch_function->function= function; + epoch_function->next= epoch_job->function_list; + epoch_job->function_list= epoch_function; + } +} + +void gearman_server_epoch_job_pop(int) +{ + gearman_server_worker_st *worker= NULL; + gearman_server_epoch_job_st *epoch_job= NULL; + gearman_server_epoch_function_st *epoch_function= NULL; + gearman_server_st *server= gearmand_server(Gearmand()); + while ((epoch_job= server->epoch_job_list) and epoch_job->when <= time(NULL)) + { + while ((epoch_function= epoch_job->function_list)) + { + if ((worker= epoch_function->function->worker_list)) + { + do + { + if (worker->con->is_sleeping and !worker->con->is_noop_sent) + { + gearmand_error_t ret= gearman_server_io_packet_add(worker->con, false, + GEARMAN_MAGIC_RESPONSE, + GEARMAN_COMMAND_NOOP, NULL); + if (gearmand_failed(ret)) + { + gearmand_log_gerror_warn(GEARMAN_DEFAULT_LOG_PARAM, ret, "Failed to send NOOP packet to %s:%s", worker->con->host(), worker->con->port()); + } + else + { + worker->con->is_noop_sent= true; + } + } + } + while ((worker= worker->function_next) and worker != epoch_function->function->worker_list); + } + + epoch_job->function_list= epoch_function->next; + delete epoch_function; + } + + server->epoch_job_list= epoch_job->next; + delete epoch_job; + } + + if (epoch_job != NULL) + { + alarm(epoch_job->when - time(NULL)); + } +} --- libgearman-server/job.h 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-server/job.h 2014-08-04 21:11:15.388735788 +0800 @@ -128,6 +128,12 @@ GEARMAN_API gearmand_error_t gearman_server_job_queue(gearman_server_job_st *server_job); +void gearman_server_epoch_job_free(gearman_server_epoch_job_st*); + +void gearman_server_epoch_job_push(int64_t, gearman_server_function_st*); + +void gearman_server_epoch_job_pop(int); + uint32_t _server_job_hash(const char *key, size_t key_size); void *_proc(void *data); --- libgearman-server/plugins/protocol/gear/protocol.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-server/plugins/protocol/gear/protocol.cc 2014-08-03 23:21:40.455334140 +0800 @@ -393,6 +393,10 @@ case SSL_ERROR_SSL: case SSL_ERROR_ZERO_RETURN: default: + if (ERR_peek_last_error()) + { + cyassl_error= ERR_peek_last_error(); + } char cyassl_error_buffer[SSL_ERROR_SIZE]= { 0 }; ERR_error_string_n(cyassl_error, cyassl_error_buffer, sizeof(cyassl_error_buffer)); return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_LOST_CONNECTION, "%s(%d)", @@ -418,9 +422,9 @@ Gear::Gear() : Plugin("Gear"), _port(GEARMAN_DEFAULT_TCP_PORT_STRING), - _ssl_ca_file(GEARMAND_CA_CERTIFICATE), - _ssl_certificate(GEARMAND_SERVER_PEM), - _ssl_key(GEARMAND_SERVER_KEY), + _ssl_ca_file(""), + _ssl_certificate(""), + _ssl_key(""), opt_ssl(false) { command_line_options().add_options() @@ -477,19 +481,40 @@ if (opt_ssl) { - if (getenv("GEARMAND_CA_CERTIFICATE")) + if (_ssl_ca_file.empty()) { - _ssl_ca_file= getenv("GEARMAND_CA_CERTIFICATE"); + if (getenv("GEARMAND_CA_CERTIFICATE")) + { + _ssl_ca_file= getenv("GEARMAND_CA_CERTIFICATE"); + } + else + { + _ssl_ca_file= GEARMAND_CA_CERTIFICATE; + } } - if (getenv("GEARMAND_SERVER_PEM")) + if (_ssl_certificate.empty()) { - _ssl_certificate= getenv("GEARMAND_SERVER_PEM"); + if (getenv("GEARMAND_SERVER_PEM")) + { + _ssl_certificate= getenv("GEARMAND_SERVER_PEM"); + } + else + { + _ssl_certificate= GEARMAND_SERVER_PEM; + } } - if (getenv("GEARMAND_SERVER_KEY")) + if (_ssl_key.empty()) { - _ssl_key= getenv("GEARMAND_SERVER_KEY"); + if (getenv("GEARMAND_SERVER_KEY")) + { + _ssl_key= getenv("GEARMAND_SERVER_KEY"); + } + else + { + _ssl_key= GEARMAND_SERVER_KEY; + } } gearmand->init_ssl(); @@ -512,6 +537,12 @@ } gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Loading certificate key : %s", _ssl_key.c_str()); + if (SSL_CTX_check_private_key(gearmand->ctx_ssl()) != SSL_SUCCESS) + { + gearmand_log_fatal(GEARMAN_DEFAULT_LOG_PARAM, "SSL_CTX_check_private_key() cannot check certificate %s", _ssl_key.c_str()); + } + gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Checking certificate key : %s", _ssl_key.c_str()); + assert(gearmand->ctx_ssl()); } #endif --- libgearman-server/plugins/protocol/http/protocol.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-server/plugins/protocol/http/protocol.cc 2014-08-02 20:17:08.195299505 +0800 @@ -149,6 +149,8 @@ case GEARMAN_COMMAND_JOB_ASSIGN_ALL: case GEARMAN_COMMAND_GET_STATUS_UNIQUE: case GEARMAN_COMMAND_STATUS_RES_UNIQUE: + case GEARMAN_COMMAND_SUBMIT_JOB_HIGH_EPOCH: + case GEARMAN_COMMAND_SUBMIT_JOB_LOW_EPOCH: case GEARMAN_COMMAND_MAX: gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Bad packet command: gearmand_command_t:%s", --- libgearman-server/plugins/queue/mysql/queue.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-server/plugins/queue/mysql/queue.cc 2014-07-24 15:36:01.585309439 +0800 @@ -78,6 +78,10 @@ MYSQL *con; MYSQL_STMT *add_stmt; MYSQL_STMT *done_stmt; + bool mysql_ssl; + std::string mysql_ca; + std::string mysql_cert; + std::string mysql_key; std::string mysql_host; std::string mysql_user; std::string mysql_password; @@ -97,9 +101,17 @@ Queue("MySQL"), con(NULL), add_stmt(NULL), - done_stmt(NULL) + done_stmt(NULL), + mysql_ssl(false), + mysql_ca(""), + mysql_cert(""), + mysql_key("") { command_line_options().add_options() + ("mysql-ssl", boost::program_options::bool_switch(&mysql_ssl)->default_value(false), "MySQL SSL enabled.") + ("mysql-ca", boost::program_options::value(&mysql_ca), "MySQL SSL CA file.") + ("mysql-cert", boost::program_options::value(&mysql_cert), "MySQL SSL certificate file.") + ("mysql-key", boost::program_options::value(&mysql_key), "MySQL SSL key file.") ("mysql-host", boost::program_options::value(&mysql_host)->default_value("localhost"), "MySQL host.") ("mysql-port", boost::program_options::value(&_port)->default_value(3306), "Port of server. (by default 3306)") ("mysql-user", boost::program_options::value(&mysql_user)->default_value(""), "MySQL user.") @@ -218,6 +230,15 @@ mysql_options(queue->con, MYSQL_READ_DEFAULT_GROUP, "gearmand"); + if (queue->mysql_ssl) + { + mysql_ssl_set(queue->con, + queue->mysql_key.c_str(), + queue->mysql_cert.c_str(), + queue->mysql_ca.c_str(), + NULL, NULL); + } + if (!mysql_real_connect(queue->con, queue->mysql_host.c_str(), queue->mysql_user.c_str(), --- libgearman-server/server.cc 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-server/server.cc 2014-08-02 19:28:11.715321169 +0800 @@ -205,11 +205,13 @@ /* Client requests. */ case GEARMAN_COMMAND_SUBMIT_JOB: case GEARMAN_COMMAND_SUBMIT_JOB_BG: + case GEARMAN_COMMAND_SUBMIT_JOB_EPOCH: case GEARMAN_COMMAND_SUBMIT_JOB_HIGH: case GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG: + case GEARMAN_COMMAND_SUBMIT_JOB_HIGH_EPOCH: case GEARMAN_COMMAND_SUBMIT_JOB_LOW: case GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG: - case GEARMAN_COMMAND_SUBMIT_JOB_EPOCH: + case GEARMAN_COMMAND_SUBMIT_JOB_LOW_EPOCH: { gearman_job_priority_t priority; @@ -220,7 +222,8 @@ priority= GEARMAN_JOB_PRIORITY_NORMAL; } else if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH or - packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG) + packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG or + packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_EPOCH) { priority= GEARMAN_JOB_PRIORITY_HIGH; } @@ -232,7 +235,9 @@ if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB_BG or packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG or packet->command == GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG or - packet->command == GEARMAN_COMMAND_SUBMIT_JOB_EPOCH) + packet->command == GEARMAN_COMMAND_SUBMIT_JOB_EPOCH or + packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_EPOCH or + packet->command == GEARMAN_COMMAND_SUBMIT_JOB_LOW_EPOCH) { server_client= NULL; } @@ -251,7 +256,9 @@ packet->arg_size[1], packet->arg[1], (int)packet->argc); int64_t when= 0; - if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB_EPOCH) + if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB_EPOCH or + packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_EPOCH or + packet->command == GEARMAN_COMMAND_SUBMIT_JOB_LOW_EPOCH) { char *endptr; // @note stroll will set errno if error, but it might also leave errno --- libgearman-server/struct/function.h 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-server/struct/function.h 2014-08-04 16:32:33.288752942 +0800 @@ -49,7 +49,16 @@ gearman_server_function_st *prev; char *function_name; gearman_server_worker_st *worker_list; + struct gearman_server_job_st *job_epoch_list[GEARMAN_JOB_PRIORITY_MAX]; + struct gearman_server_job_st *job_epoch_end[GEARMAN_JOB_PRIORITY_MAX]; + struct gearman_server_job_st *job_bg_list[GEARMAN_JOB_PRIORITY_MAX]; + struct gearman_server_job_st *job_bg_end[GEARMAN_JOB_PRIORITY_MAX]; struct gearman_server_job_st *job_list[GEARMAN_JOB_PRIORITY_MAX]; - gearman_server_job_st *job_end[GEARMAN_JOB_PRIORITY_MAX]; + struct gearman_server_job_st *job_end[GEARMAN_JOB_PRIORITY_MAX]; }; +struct gearman_server_epoch_function_st +{ + gearman_server_function_st *function; + gearman_server_epoch_function_st *next; +}; --- libgearman-server/struct/job.h 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-server/struct/job.h 2014-08-04 21:14:43.728737682 +0800 @@ -68,3 +68,10 @@ char unique[GEARMAN_MAX_UNIQUE_SIZE]; char reducer[GEARMAN_FUNCTION_MAX_SIZE]; }; + +struct gearman_server_epoch_job_st +{ + int64_t when; + gearman_server_epoch_job_st *next; + gearman_server_epoch_function_st *function_list; +}; --- libgearman-server/struct/server.h 2014-02-12 08:05:28.000000000 +0800 +++ libgearman-server/struct/server.h 2014-08-04 20:17:17.312039668 +0800 @@ -80,6 +80,7 @@ } flags; struct State { bool queue_startup; + bool epoch_startup; } state; bool shutdown; bool shutdown_graceful; @@ -111,6 +112,7 @@ uint32_t hashtable_buckets; gearman_server_job_st **job_hash; gearman_server_job_st **unique_hash; + gearman_server_epoch_job_st *epoch_job_list; gearman_server_st() { --- tests/libgearman-1.0/protocol.cc 2014-02-12 08:05:28.000000000 +0800 +++ tests/libgearman-1.0/protocol.cc 2014-08-02 20:22:29.445297135 +0800 @@ -92,6 +92,10 @@ ASSERT_EQ(38, int(GEARMAN_COMMAND_SUBMIT_REDUCE_JOB_BACKGROUND)); ASSERT_EQ(39, int(GEARMAN_COMMAND_GRAB_JOB_ALL)); ASSERT_EQ(40, int(GEARMAN_COMMAND_JOB_ASSIGN_ALL)); + ASSERT_EQ(41, int(GEARMAN_COMMAND_GET_STATUS_UNIQUE)); + ASSERT_EQ(42, int(GEARMAN_COMMAND_STATUS_RES_UNIQUE)); + ASSERT_EQ(43, int(GEARMAN_COMMAND_SUBMIT_JOB_HIGH_EPOCH)); + ASSERT_EQ(44, int(GEARMAN_COMMAND_SUBMIT_JOB_LOW_EPOCH)); return TEST_SUCCESS; } --- util/instance.cc 2014-02-12 08:05:28.000000000 +0800 +++ util/instance.cc 2014-08-03 23:22:25.045333811 +0800 @@ -125,7 +125,7 @@ SSL_load_error_strings(); SSL_library_init(); - if ((_ctx_ssl= SSL_CTX_new(TLSv1_client_method())) == NULL) + if ((_ctx_ssl= SSL_CTX_new(SSLv23_client_method())) == NULL) { _last_error= "SSL_CTX_new error"; return false; @@ -154,6 +154,14 @@ _last_error= message.str(); return false; } + + if (SSL_CTX_check_private_key(_ctx_ssl) != SSL_SUCCESS) + { + std::stringstream message; + message << "Error check private key."; + _last_error= message.str(); + return false; + } #endif // defined(HAVE_CYASSL) && HAVE_CYASSL return true; } @@ -273,6 +281,8 @@ _last_error= "SSL_set_fd() failed"; return false; } + + SSL_set_connect_state(_ssl); } #endif @@ -315,6 +325,10 @@ case SSL_ERROR_SSL: default: { + if (ERR_peek_last_error()) + { + ssl_error= ERR_peek_last_error(); + } char cyassl_error_buffer[SSL_ERROR_SIZE]= { 0 }; ERR_error_string_n(ssl_error, cyassl_error_buffer, sizeof(cyassl_error_buffer)); _last_error= cyassl_error_buffer; @@ -398,6 +412,10 @@ case SSL_ERROR_SSL: default: { + if (ERR_peek_last_error()) + { + ssl_error= ERR_peek_last_error(); + } char cyassl_error_buffer[SSL_ERROR_SIZE]= { 0 }; ERR_error_string_n(ssl_error, cyassl_error_buffer, sizeof(cyassl_error_buffer)); _last_error= cyassl_error_buffer;