diff --git a/plugins/in_http/http_prot.c b/plugins/in_http/http_prot.c index be861385e78..188591e4d0d 100644 --- a/plugins/in_http/http_prot.c +++ b/plugins/in_http/http_prot.c @@ -127,7 +127,7 @@ static int send_response(struct http_conn *conn, int http_status, char *message) } else if (http_status == 400) { flb_sds_printf(&out, - "HTTP/1.1 400 Forbidden\r\n" + "HTTP/1.1 400 Bad Request\r\n" "Server: Fluent Bit v%s\r\n" "Content-Length: %i\r\n\r\n%s", FLB_VERSION_STR, @@ -211,6 +211,56 @@ static flb_sds_t tag_key(struct flb_http *ctx, msgpack_object *map) return NULL; } +static int process_pack_record(struct flb_http *ctx, struct flb_time *tm, + flb_sds_t tag, + msgpack_object *record) +{ + int ret; + + ret = flb_log_event_encoder_begin_record(&ctx->log_encoder); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return -1; + } + + ret = flb_log_event_encoder_set_timestamp(&ctx->log_encoder, tm); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return -1; + } + + ret = flb_log_event_encoder_set_body_from_msgpack_object( + &ctx->log_encoder, + record); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return -1; + } + + ret = flb_log_event_encoder_commit_record(&ctx->log_encoder); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return -1; + } + + if (tag) { + ret = flb_input_log_append(ctx->ins, + tag, + flb_sds_len(tag), + ctx->log_encoder.output_buffer, + ctx->log_encoder.output_length); + } + else { + /* use default plugin Tag (it internal name, e.g: http.0 */ + ret = flb_input_log_append(ctx->ins, NULL, 0, + ctx->log_encoder.output_buffer, + ctx->log_encoder.output_length); + } + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return -1; + } + + return 0; +} + int process_pack(struct flb_http *ctx, flb_sds_t tag, char *buf, size_t size) { int ret; @@ -233,48 +283,19 @@ int process_pack(struct flb_http *ctx, flb_sds_t tag, char *buf, size_t size) tag_from_record = tag_key(ctx, obj); } - ret = flb_log_event_encoder_begin_record(&ctx->log_encoder); - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_timestamp( - &ctx->log_encoder, - &tm); + if (tag_from_record) { + ret = process_pack_record(ctx, &tm, tag_from_record, obj); + flb_sds_destroy(tag_from_record); } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_body_from_msgpack_object( - &ctx->log_encoder, - &result.data); + else if (tag) { + ret = process_pack_record(ctx, &tm, tag, obj); } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_commit_record(&ctx->log_encoder); + else { + ret = process_pack_record(ctx, &tm, NULL, obj); } - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - if (tag_from_record) { - flb_input_log_append(ctx->ins, - tag_from_record, - flb_sds_len(tag_from_record), - ctx->log_encoder.output_buffer, - ctx->log_encoder.output_length); - - flb_sds_destroy(tag_from_record); - } - else if (tag) { - flb_input_log_append(ctx->ins, tag, flb_sds_len(tag), - ctx->log_encoder.output_buffer, - ctx->log_encoder.output_length); - } - else { - /* use default plugin Tag (it internal name, e.g: http.0 */ - flb_input_log_append(ctx->ins, NULL, 0, - ctx->log_encoder.output_buffer, - ctx->log_encoder.output_length); - } - } - else { - flb_plg_error(ctx->ins, "Error encoding record : %d", ret); + if (ret != 0) { + goto log_event_error; } flb_log_event_encoder_reset(&ctx->log_encoder); @@ -289,48 +310,19 @@ int process_pack(struct flb_http *ctx, flb_sds_t tag, char *buf, size_t size) tag_from_record = tag_key(ctx, &record); } - ret = flb_log_event_encoder_begin_record(&ctx->log_encoder); - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_timestamp( - &ctx->log_encoder, - &tm); + if (tag_from_record) { + ret = process_pack_record(ctx, &tm, tag_from_record, &record); + flb_sds_destroy(tag_from_record); } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_body_from_msgpack_object( - &ctx->log_encoder, - &record); + else if (tag) { + ret = process_pack_record(ctx, &tm, tag, &record); } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_commit_record(&ctx->log_encoder); + else { + ret = process_pack_record(ctx, &tm, NULL, &record); } - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - if (tag_from_record) { - flb_input_log_append(ctx->ins, - tag_from_record, - flb_sds_len(tag_from_record), - ctx->log_encoder.output_buffer, - ctx->log_encoder.output_length); - - flb_sds_destroy(tag_from_record); - } - else if (tag) { - flb_input_log_append(ctx->ins, tag, flb_sds_len(tag), - ctx->log_encoder.output_buffer, - ctx->log_encoder.output_length); - } - else { - /* use default plugin Tag (it internal name, e.g: http.0 */ - flb_input_log_append(ctx->ins, NULL, 0, - ctx->log_encoder.output_buffer, - ctx->log_encoder.output_length); - } - } - else { - flb_plg_error(ctx->ins, "Error encoding record : %d", ret); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + goto log_event_error; } /* TODO : Optimize this @@ -350,7 +342,6 @@ int process_pack(struct flb_http *ctx, flb_sds_t tag, char *buf, size_t size) result.data.type); msgpack_unpacked_destroy(&result); - return -1; } } @@ -358,6 +349,11 @@ int process_pack(struct flb_http *ctx, flb_sds_t tag, char *buf, size_t size) msgpack_unpacked_destroy(&result); return 0; + +log_event_error: + msgpack_unpacked_destroy(&result); + flb_plg_error(ctx->ins, "Error encoding record : %d", ret); + return ret; } static ssize_t parse_payload_json(struct flb_http *ctx, flb_sds_t tag, @@ -390,10 +386,10 @@ static ssize_t parse_payload_json(struct flb_http *ctx, flb_sds_t tag, } /* Process the packaged JSON and return the last byte used */ - process_pack(ctx, tag, pack, out_size); + ret = process_pack(ctx, tag, pack, out_size); flb_free(pack); - return 0; + return ret; } static ssize_t parse_payload_urlencoded(struct flb_http *ctx, flb_sds_t tag, @@ -548,14 +544,15 @@ static int process_payload(struct flb_http *ctx, struct http_conn *conn, } if (type == HTTP_CONTENT_JSON) { - parse_payload_json(ctx, tag, request->data.data, request->data.len); + ret = parse_payload_json(ctx, tag, request->data.data, request->data.len); } else if (type == HTTP_CONTENT_URLENCODED) { ret = parse_payload_urlencoded(ctx, tag, request->data.data, request->data.len); - if (ret != 0) { - send_response(conn, 400, "error: invalid payload\n"); - return -1; - } + } + + if (ret != 0) { + send_response(conn, 400, "error: invalid payload\n"); + return -1; } return 0; @@ -685,6 +682,9 @@ int http_prot_handle(struct flb_http *ctx, struct http_conn *conn, if (ret == 0) { send_response(conn, ctx->successful_response_code, NULL); } + else { + send_response(conn, 400, "unable to process records\n"); + } return ret; } @@ -779,53 +779,25 @@ static int process_pack_ng(struct flb_http *ctx, flb_sds_t tag, char *buf, size_ while (msgpack_unpack_next(&result, buf, size, &off) == MSGPACK_UNPACK_SUCCESS) { if (result.data.type == MSGPACK_OBJECT_MAP) { tag_from_record = NULL; + obj = &result.data; + if (ctx->tag_key) { - obj = &result.data; tag_from_record = tag_key(ctx, obj); } - ret = flb_log_event_encoder_begin_record(&ctx->log_encoder); - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_timestamp( - &ctx->log_encoder, - &tm); + if (tag_from_record) { + ret = process_pack_record(ctx, &tm, tag_from_record, obj); + flb_sds_destroy(tag_from_record); } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_body_from_msgpack_object( - &ctx->log_encoder, - &result.data); + else if (tag) { + ret = process_pack_record(ctx, &tm, tag, obj); } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_commit_record(&ctx->log_encoder); + else { + ret = process_pack_record(ctx, &tm, NULL, obj); } - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - if (tag_from_record) { - flb_input_log_append(ctx->ins, - tag_from_record, - flb_sds_len(tag_from_record), - ctx->log_encoder.output_buffer, - ctx->log_encoder.output_length); - - flb_sds_destroy(tag_from_record); - } - else if (tag) { - flb_input_log_append(ctx->ins, tag, flb_sds_len(tag), - ctx->log_encoder.output_buffer, - ctx->log_encoder.output_length); - } - else { - /* use default plugin Tag (it internal name, e.g: http.0 */ - flb_input_log_append(ctx->ins, NULL, 0, - ctx->log_encoder.output_buffer, - ctx->log_encoder.output_length); - } - } - else { - flb_plg_error(ctx->ins, "Error encoding record : %d", ret); + if (ret != 0) { + goto log_event_error; } flb_log_event_encoder_reset(&ctx->log_encoder); @@ -836,53 +808,19 @@ static int process_pack_ng(struct flb_http *ctx, flb_sds_t tag, char *buf, size_ { record = obj->via.array.ptr[i]; - tag_from_record = NULL; - if (ctx->tag_key) { - tag_from_record = tag_key(ctx, &record); + if (tag_from_record) { + ret = process_pack_record(ctx, &tm, tag_from_record, &record); + flb_sds_destroy(tag_from_record); } - - ret = flb_log_event_encoder_begin_record(&ctx->log_encoder); - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_timestamp( - &ctx->log_encoder, - &tm); + else if (tag) { + ret = process_pack_record(ctx, &tm, tag, &record); } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_body_from_msgpack_object( - &ctx->log_encoder, - &record); + else { + ret = process_pack_record(ctx, &tm, NULL, &record); } - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_commit_record(&ctx->log_encoder); - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - if (tag_from_record) { - flb_input_log_append(ctx->ins, - tag_from_record, - flb_sds_len(tag_from_record), - ctx->log_encoder.output_buffer, - ctx->log_encoder.output_length); - - flb_sds_destroy(tag_from_record); - } - else if (tag) { - flb_input_log_append(ctx->ins, tag, flb_sds_len(tag), - ctx->log_encoder.output_buffer, - ctx->log_encoder.output_length); - } - else { - /* use default plugin Tag (it internal name, e.g: http.0 */ - flb_input_log_append(ctx->ins, NULL, 0, - ctx->log_encoder.output_buffer, - ctx->log_encoder.output_length); - } - } - else { - flb_plg_error(ctx->ins, "Error encoding record : %d", ret); + if (ret != 0) { + goto log_event_error; } /* TODO : Optimize this @@ -908,8 +846,12 @@ static int process_pack_ng(struct flb_http *ctx, flb_sds_t tag, char *buf, size_ } msgpack_unpacked_destroy(&result); - return 0; + +log_event_error: + flb_plg_error(ctx->ins, "Error encoding record : %d", ret); + msgpack_unpacked_destroy(&result); + return -1; } static ssize_t parse_payload_json_ng(flb_sds_t tag, @@ -949,10 +891,10 @@ static ssize_t parse_payload_json_ng(flb_sds_t tag, } /* Process the packaged JSON and return the last byte used */ - process_pack_ng(ctx, tag, pack, out_size); + ret = process_pack_ng(ctx, tag, pack, out_size); flb_free(pack); - return 0; + return ret; } static int process_payload_ng(flb_sds_t tag, @@ -988,13 +930,13 @@ static int process_payload_ng(flb_sds_t tag, } if (type == HTTP_CONTENT_JSON) { - parse_payload_json_ng(tag, request); + return parse_payload_json_ng(tag, request); } else if (type == HTTP_CONTENT_URLENCODED) { ctx = (struct flb_http *) request->stream->user_data; payload = (char *) request->body; if (payload) { - parse_payload_urlencoded(ctx, tag, payload, cfl_sds_len(payload)); + return parse_payload_urlencoded(ctx, tag, payload, cfl_sds_len(payload)); } } @@ -1056,7 +998,12 @@ int http_prot_handle_ng(struct flb_http_request *request, ret = process_payload_ng(tag, request, response); flb_sds_destroy(tag); - send_response_ng(response, ctx->successful_response_code, NULL); + if (ret == 0) { + send_response_ng(response, ctx->successful_response_code, NULL); + } + else { + send_response_ng(response, 400, "error: unable to process records\n"); + } return ret; } diff --git a/tests/runtime/in_http.c b/tests/runtime/in_http.c index 700e581f83f..c949288fd4d 100644 --- a/tests/runtime/in_http.c +++ b/tests/runtime/in_http.c @@ -277,7 +277,6 @@ void flb_test_http() flb_upstream_conn_release(ctx->httpc->u_conn); test_ctx_destroy(ctx); } - void flb_test_http_successful_response_code(char *response_code) { struct flb_lib_out_cb cb_data; @@ -361,6 +360,155 @@ void flb_test_http_successful_response_code_204() flb_test_http_successful_response_code("204"); } +void flb_test_http_failure_400_bad_json() { + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + struct flb_http_client *c; + int ret; + size_t b_sent; + + char *buf = "\"INVALIDJSON"; + + clear_output_num(); + + cb_data.cb = cb_check_result_json; + cb_data.data = "\"test\":\"msg\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->i_ffd, NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + ctx->httpc = http_client_ctx_create(); + TEST_CHECK(ctx->httpc != NULL); + + c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/", buf, strlen(buf), + "127.0.0.1", 9880, NULL, 0); + ret = flb_http_add_header(c, FLB_HTTP_HEADER_CONTENT_TYPE, strlen(FLB_HTTP_HEADER_CONTENT_TYPE), + JSON_CONTENT_TYPE, strlen(JSON_CONTENT_TYPE)); + TEST_CHECK(ret == 0); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("http_client failed"); + exit(EXIT_FAILURE); + } + + ret = flb_http_do(c, &b_sent); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("ret error. ret=%d\n", ret); + } + else if (!TEST_CHECK(b_sent > 0)){ + TEST_MSG("b_sent size error. b_sent = %lu\n", b_sent); + } + else if (!TEST_CHECK(c->resp.status == 400)) { + TEST_MSG("http response code error. expect: %d, got: %d\n", 400, c->resp.status); + } + + /* waiting to flush */ + flb_time_msleep(1500); + + flb_http_client_destroy(c); + flb_upstream_conn_release(ctx->httpc->u_conn); + test_ctx_destroy(ctx); +} + +void flb_test_http_failure_400_bad_disk_write() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + struct flb_http_client *c; + int ret; + size_t b_sent; + + char *buf = "{\"foo\": \"bar\"}"; + + clear_output_num(); + + cb_data.cb = cb_check_result_json; + cb_data.data = "\"test\":\"msg\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_service_set(ctx->flb, + "storage.path", "/tmp/http-input-test-404-bad-write", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "storage.type", "filesystem", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + flb_time_msleep(5000); + + ret = chmod("/tmp/http-input-test-404-bad-write", 000); + TEST_CHECK(ret == 0); + + ctx->httpc = http_client_ctx_create(); + TEST_CHECK(ctx->httpc != NULL); + + c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/", buf, strlen(buf), + "127.0.0.1", 9880, NULL, 0); + ret = flb_http_add_header(c, FLB_HTTP_HEADER_CONTENT_TYPE, strlen(FLB_HTTP_HEADER_CONTENT_TYPE), + JSON_CONTENT_TYPE, strlen(JSON_CONTENT_TYPE)); + TEST_CHECK(ret == 0); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("http_client failed"); + exit(EXIT_FAILURE); + } + + ret = flb_http_do(c, &b_sent); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("ret error. ret=%d\n", ret); + } + else if (!TEST_CHECK(b_sent > 0)){ + TEST_MSG("b_sent size error. b_sent = %lu\n", b_sent); + } + else if (!TEST_CHECK(c->resp.status == 400)) { + TEST_MSG("http response code error. expect: %d, got: %d\n", 400, c->resp.status); + } + + chmod("/tmp/http-input-test-404-bad-write/http.0", 0700); + rmdir("/tmp/http-input-test-404-bad-write/http.0"); + + chmod("/tmp/http-input-test-404-bad-write", 0700); + rmdir("/tmp/http-input-test-404-bad-write"); + + /* waiting to flush */ + flb_time_msleep(1500); + + flb_http_client_destroy(c); + flb_upstream_conn_release(ctx->httpc->u_conn); + test_ctx_destroy(ctx); +} + void flb_test_http_tag_key() { struct flb_lib_out_cb cb_data; @@ -438,7 +586,8 @@ TEST_LIST = { {"http", flb_test_http}, {"successful_response_code_200", flb_test_http_successful_response_code_200}, {"successful_response_code_204", flb_test_http_successful_response_code_204}, + {"failure_response_code_400_bad_json", flb_test_http_failure_400_bad_json}, + {"failure_response_code_400_bad_disk_write", flb_test_http_failure_400_bad_disk_write}, {"tag_key", flb_test_http_tag_key}, {NULL, NULL} }; -