[json] fixes for json log parsing

I was careless with the initial json log impl since it did not deal
with incomplete log lines very well.  This seems to fix some cases.
But, I was still able to replicate a lockup at one point, so it
might not be complete.
pull/290/head
Timothy Stack 9 years ago
parent 64cbab1281
commit 8964232374

@ -609,19 +609,18 @@ bool external_log_format::scan_for_partial(shared_buffer_ref &sbr, size_t &len_o
return len_out > pat->p_timestamp_end; return len_out > pat->p_timestamp_end;
} }
bool external_log_format::scan(std::vector<logline> &dst, log_format::scan_result_t external_log_format::scan(std::vector<logline> &dst,
off_t offset, off_t offset,
shared_buffer_ref &sbr) shared_buffer_ref &sbr)
{ {
if (this->jlf_json) { if (this->jlf_json) {
yajlpp_parse_context &ypc = *(this->jlf_parse_context); yajlpp_parse_context &ypc = *(this->jlf_parse_context);
logline ll(offset, 0, 0, logline::LEVEL_INFO); logline ll(offset, 0, 0, logline::LEVEL_INFO);
yajl_handle handle = this->jlf_yajl_handle.in(); yajl_handle handle = this->jlf_yajl_handle.in();
json_log_userdata jlu(sbr); json_log_userdata jlu(sbr);
bool retval = false;
if (sbr.empty() || sbr.get_data()[sbr.length() - 1] != '}') { if (sbr.empty() || sbr.get_data()[sbr.length() - 1] != '}') {
return false; return log_format::SCAN_INCOMPLETE;
} }
yajl_reset(handle); yajl_reset(handle);
@ -648,19 +647,23 @@ bool external_log_format::scan(std::vector<logline> &dst,
} }
dst.push_back(ll); dst.push_back(ll);
} }
retval = true;
} }
else { else {
unsigned char *msg = yajl_get_error(handle, 1, (const unsigned char *)sbr.get_data(), sbr.length()); unsigned char *msg;
log_debug("bad line %s", msg);
msg = yajl_get_error(handle, 1, (const unsigned char *)sbr.get_data(), sbr.length());
if (msg != NULL) {
log_debug("Unable to parse line at offset %d: %s", offset, msg);
yajl_free_error(handle, msg);
}
return log_format::SCAN_INCOMPLETE;
} }
return retval; return log_format::SCAN_MATCH;
} }
pcre_input pi(sbr.get_data(), 0, sbr.length()); pcre_input pi(sbr.get_data(), 0, sbr.length());
pcre_context_static<128> pc; pcre_context_static<128> pc;
bool retval = false;
int curr_fmt = -1; int curr_fmt = -1;
while (::next_format(this->elf_pattern_order, curr_fmt, this->lf_fmt_lock)) { while (::next_format(this->elf_pattern_order, curr_fmt, this->lf_fmt_lock)) {
@ -737,11 +740,10 @@ bool external_log_format::scan(std::vector<logline> &dst,
dst.push_back(logline(offset, log_tv, level, mod_index, opid)); dst.push_back(logline(offset, log_tv, level, mod_index, opid));
this->lf_fmt_lock = curr_fmt; this->lf_fmt_lock = curr_fmt;
retval = true; return log_format::SCAN_MATCH;
break;
} }
return retval; return log_format::SCAN_NO_MATCH;
} }
uint8_t external_log_format::module_scan(const pcre_input &pi, uint8_t external_log_format::module_scan(const pcre_input &pi,

@ -608,6 +608,12 @@ public:
virtual bool match_name(const std::string &filename) { return true; }; virtual bool match_name(const std::string &filename) { return true; };
enum scan_result_t {
SCAN_MATCH,
SCAN_NO_MATCH,
SCAN_INCOMPLETE,
};
/** /**
* Scan a log line to see if it matches this log format. * Scan a log line to see if it matches this log format.
* *
@ -617,9 +623,9 @@ public:
* @param prefix The contents of the line. * @param prefix The contents of the line.
* @param len The length of the prefix string. * @param len The length of the prefix string.
*/ */
virtual bool scan(std::vector<logline> &dst, virtual scan_result_t scan(std::vector<logline> &dst,
off_t offset, off_t offset,
shared_buffer_ref &sbr) = 0; shared_buffer_ref &sbr) = 0;
virtual bool scan_for_partial(shared_buffer_ref &sbr, size_t &len_out) { virtual bool scan_for_partial(shared_buffer_ref &sbr, size_t &len_out) {
return false; return false;
@ -816,9 +822,9 @@ public:
return this->elf_filename_pcre->match(pc, pi); return this->elf_filename_pcre->match(pc, pi);
}; };
bool scan(std::vector<logline> &dst, scan_result_t scan(std::vector<logline> &dst,
off_t offset, off_t offset,
shared_buffer_ref &sbr); shared_buffer_ref &sbr);
bool scan_for_partial(shared_buffer_ref &sbr, size_t &len_out); bool scan_for_partial(shared_buffer_ref &sbr, size_t &len_out);

@ -130,11 +130,10 @@ class generic_log_format : public log_format {
} }
}; };
bool scan(vector<logline> &dst, scan_result_t scan(vector<logline> &dst,
off_t offset, off_t offset,
shared_buffer_ref &sbr) shared_buffer_ref &sbr)
{ {
bool retval = false;
struct exttm log_time; struct exttm log_time;
struct timeval log_tv; struct timeval log_tv;
pcre_context::capture_t ts, level; pcre_context::capture_t ts, level;
@ -157,10 +156,10 @@ class generic_log_format : public log_format {
this->check_for_new_year(dst, log_time, log_tv); this->check_for_new_year(dst, log_time, log_tv);
dst.push_back(logline(offset, log_tv, level_val)); dst.push_back(logline(offset, log_tv, level_val));
retval = true; return SCAN_MATCH;
} }
return retval; return SCAN_NO_MATCH;
}; };
void annotate(shared_buffer_ref &line, void annotate(shared_buffer_ref &line,

@ -137,7 +137,7 @@ void logfile::set_format_base_time(log_format *lf)
void logfile::process_prefix(off_t offset, shared_buffer_ref &sbr) void logfile::process_prefix(off_t offset, shared_buffer_ref &sbr)
{ {
bool found = false; log_format::scan_result_t found = log_format::SCAN_NO_MATCH;
if (this->lf_format.get() != NULL) { if (this->lf_format.get() != NULL) {
/* We've locked onto a format, just use that scanner. */ /* We've locked onto a format, just use that scanner. */
@ -153,7 +153,7 @@ void logfile::process_prefix(off_t offset, shared_buffer_ref &sbr)
* are sufficiently different that there are no ambiguities... * are sufficiently different that there are no ambiguities...
*/ */
for (iter = root_formats.begin(); for (iter = root_formats.begin();
iter != root_formats.end() && !found; iter != root_formats.end() && (found != log_format::SCAN_MATCH);
++iter) { ++iter) {
if (!(*iter)->match_name(this->lf_filename)) { if (!(*iter)->match_name(this->lf_filename)) {
continue; continue;
@ -161,7 +161,8 @@ void logfile::process_prefix(off_t offset, shared_buffer_ref &sbr)
(*iter)->clear(); (*iter)->clear();
this->set_format_base_time(*iter); this->set_format_base_time(*iter);
if ((*iter)->scan(this->lf_index, offset, sbr)) { found = (*iter)->scan(this->lf_index, offset, sbr);
if (found == log_format::SCAN_MATCH) {
#if 0 #if 0
require(this->lf_index.size() == 1 || require(this->lf_index.size() == 1 ||
(this->lf_index[this->lf_index.size() - 2] < (this->lf_index[this->lf_index.size() - 2] <
@ -176,7 +177,6 @@ void logfile::process_prefix(off_t offset, shared_buffer_ref &sbr)
auto_ptr<log_format>((*iter)->specialized()); auto_ptr<log_format>((*iter)->specialized());
this->set_format_base_time(this->lf_format.get()); this->set_format_base_time(this->lf_format.get());
this->lf_content_id = hash_string(string(sbr.get_data(), sbr.length())); this->lf_content_id = hash_string(string(sbr.get_data(), sbr.length()));
found = true;
/* /*
* We'll go ahead and assume that any previous lines were * We'll go ahead and assume that any previous lines were
@ -193,47 +193,50 @@ void logfile::process_prefix(off_t offset, shared_buffer_ref &sbr)
} }
} }
if (found) { switch (found) {
if (this->lf_index.size() >= 2) { case log_format::SCAN_MATCH:
logline &second_to_last = this->lf_index[this->lf_index.size() - 2]; if (this->lf_index.size() >= 2) {
logline &latest = this->lf_index.back(); logline &second_to_last = this->lf_index[this->lf_index.size() - 2];
logline &latest = this->lf_index.back();
if (latest < second_to_last) { if (latest < second_to_last) {
latest.set_time(second_to_last.get_time()); latest.set_time(second_to_last.get_time());
latest.set_millis(second_to_last.get_millis()); latest.set_millis(second_to_last.get_millis());
}
} }
} break;
} case log_format::SCAN_NO_MATCH: {
logline::level_t last_level = logline::LEVEL_UNKNOWN;
time_t last_time = this->lf_index_time;
short last_millis = 0;
uint8_t last_mod = 0, last_opid = 0;
/* If the scanner didn't match, than we need to add it. */ if (!this->lf_index.empty()) {
if (!found) { logline &ll = this->lf_index.back();
logline::level_t last_level = logline::LEVEL_UNKNOWN;
time_t last_time = this->lf_index_time;
short last_millis = 0;
uint8_t last_mod = 0, last_opid = 0;
if (!this->lf_index.empty()) { /*
logline &ll = this->lf_index.back(); * Assume this line is part of the previous one(s) and copy the
* metadata over.
/* */
* Assume this line is part of the previous one(s) and copy the last_time = ll.get_time();
* metadata over. last_millis = ll.get_millis();
*/ if (this->lf_format.get() != NULL) {
last_time = ll.get_time(); last_level = (logline::level_t)
last_millis = ll.get_millis(); (ll.get_level() | logline::LEVEL_CONTINUED);
if (this->lf_format.get() != NULL) { }
last_level = (logline::level_t) last_mod = ll.get_module_id();
(ll.get_level() | logline::LEVEL_CONTINUED); last_opid = ll.get_opid();
} }
last_mod = ll.get_module_id(); this->lf_index.push_back(logline(offset,
last_opid = ll.get_opid(); last_time,
last_millis,
last_level,
last_mod,
last_opid));
break;
} }
this->lf_index.push_back(logline(offset, case log_format::SCAN_INCOMPLETE:
last_time, break;
last_millis,
last_level,
last_mod,
last_opid));
} }
} }

@ -144,7 +144,7 @@ int main(int argc, char *argv[])
iter != root_formats.end() && !found; iter != root_formats.end() && !found;
++iter) { ++iter) {
(*iter)->clear(); (*iter)->clear();
if ((*iter)->scan(index, 13, sbr)) { if ((*iter)->scan(index, 13, sbr) == log_format::SCAN_MATCH) {
format = (*iter)->specialized(); format = (*iter)->specialized();
found = true; found = true;
} }

Loading…
Cancel
Save