mirror of
https://github.com/twitter/the-algorithm.git
synced 2025-01-07 01:48:16 +01:00
fix(libtwml): use fstream in BlockFormatWriter
Signed-off-by: Elijah Conners <business@elijahpepe.com>
This commit is contained in:
parent
ec83d01dca
commit
696f03f220
@ -1,4 +1,5 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
#include <fstream>
|
||||||
#include <twml/defines.h>
|
#include <twml/defines.h>
|
||||||
#include <cstdlib>
|
#include <cstdlib>
|
||||||
#include <cstdio>
|
#include <cstdio>
|
||||||
@ -27,19 +28,18 @@ namespace twml {
|
|||||||
class BlockFormatWriter {
|
class BlockFormatWriter {
|
||||||
private:
|
private:
|
||||||
const char *file_name_;
|
const char *file_name_;
|
||||||
FILE *outputfile_;
|
std::fstream outputfile_;
|
||||||
char temp_file_name_[PATH_MAX];
|
char temp_file_name_[PATH_MAX];
|
||||||
int record_index_;
|
int record_index_;
|
||||||
int records_per_block_;
|
int records_per_block_;
|
||||||
|
|
||||||
int pack_tag_and_wiretype(FILE *file, uint32_t tag, uint32_t wiretype);
|
int pack_tag_and_wiretype(std::fstream& fs, uint32_t tag, uint32_t wiretype);
|
||||||
int pack_varint_i32(FILE *file, int value);
|
int pack_varint_i32(std::fstream& fs, int value);
|
||||||
int pack_string(FILE *file, const char *in, size_t in_len);
|
int pack_string(std::fstream& fs, const char *in, size_t in_len);
|
||||||
int write_int(FILE *file, int value);
|
int write_int(std::fstream& fs, int value);
|
||||||
|
|
||||||
public:
|
public:
|
||||||
BlockFormatWriter(const char *file_name, int record_per_block);
|
BlockFormatWriter(const char *file_name, int record_per_block);
|
||||||
~BlockFormatWriter();
|
|
||||||
int write(const char *class_name, const char *record, int record_len) ;
|
int write(const char *class_name, const char *record, int record_len) ;
|
||||||
int flush();
|
int flush();
|
||||||
block_format_writer getHandle();
|
block_format_writer getHandle();
|
||||||
|
@ -20,29 +20,25 @@ namespace twml {
|
|||||||
BlockFormatWriter::BlockFormatWriter(const char *file_name, int record_per_block) :
|
BlockFormatWriter::BlockFormatWriter(const char *file_name, int record_per_block) :
|
||||||
file_name_(file_name), record_index_(0), records_per_block_(record_per_block) {
|
file_name_(file_name), record_index_(0), records_per_block_(record_per_block) {
|
||||||
snprintf(temp_file_name_, PATH_MAX, "%s.block", file_name);
|
snprintf(temp_file_name_, PATH_MAX, "%s.block", file_name);
|
||||||
outputfile_ = fopen(file_name_, "a");
|
std::fstream outputfile_(file_name_, std::ios::app);
|
||||||
}
|
}
|
||||||
|
|
||||||
BlockFormatWriter::~BlockFormatWriter() {
|
int BlockFormatWriter::pack_tag_and_wiretype(std::fstream& fs, uint32_t tag, uint32_t wiretype) {
|
||||||
fclose(outputfile_);
|
|
||||||
}
|
|
||||||
// TODO: use fstream
|
|
||||||
int BlockFormatWriter::pack_tag_and_wiretype(FILE *buffer, uint32_t tag, uint32_t wiretype) {
|
|
||||||
uint8_t x = ((tag & 0x0f) << 3) | (wiretype & 0x7);
|
uint8_t x = ((tag & 0x0f) << 3) | (wiretype & 0x7);
|
||||||
size_t n = fwrite(&x, 1, 1, buffer);
|
fs.write(reinterpret_cast<char*>(&x), sizeof(x));
|
||||||
if (n != 1) {
|
if (fs.fail()) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int BlockFormatWriter::pack_varint_i32(FILE *buffer, int value) {
|
int BlockFormatWriter::pack_varint_i32(std::fstream& fs, int value) {
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
uint8_t x = value & 0x7F;
|
uint8_t x = value & 0x7F;
|
||||||
value = value >> 7;
|
value = value >> 7;
|
||||||
if (value != 0) x |= 0x80;
|
if (value != 0) x |= 0x80;
|
||||||
size_t n = fwrite(&x, 1, 1, buffer);
|
fs.write(reinterpret_cast<char*>(&x), sizeof(x));
|
||||||
if (n != 1) {
|
if (fs.fail()) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (value == 0) break;
|
if (value == 0) break;
|
||||||
@ -50,21 +46,23 @@ namespace twml {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int BlockFormatWriter::pack_string(FILE *buffer, const char *in, size_t in_len) {
|
int BlockFormatWriter::pack_string(std::fstream& fs, const char *in, size_t in_len) {
|
||||||
if (pack_varint_i32(buffer, in_len)) return -1;
|
if (pack_varint_i32(fs, in_len)) return -1;
|
||||||
size_t n = fwrite(in, 1, in_len, buffer);
|
fs.write(in, in_len);
|
||||||
if (n != in_len) return -1;
|
if (fs.fail()) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int BlockFormatWriter::write_int(FILE *buffer, int value) {
|
int BlockFormatWriter::write_int(std::fstream& fs, int value) {
|
||||||
uint8_t buff[4];
|
uint8_t buff[4];
|
||||||
buff[0] = value & 0xff;
|
buff[0] = value & 0xff;
|
||||||
buff[1] = (value >> 8) & 0xff;
|
buff[1] = (value >> 8) & 0xff;
|
||||||
buff[2] = (value >> 16) & 0xff;
|
buff[2] = (value >> 16) & 0xff;
|
||||||
buff[3] = (value >> 24) & 0xff;
|
buff[3] = (value >> 24) & 0xff;
|
||||||
size_t n = fwrite(buff, 1, 4, buffer);
|
fs.write(reinterpret_cast<char*>(buff), sizeof(buff));
|
||||||
if (n != 4) {
|
if (fs.fail()) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
@ -74,9 +72,9 @@ namespace twml {
|
|||||||
if (record) {
|
if (record) {
|
||||||
record_index_++;
|
record_index_++;
|
||||||
// The buffer holds max records_per_block_ of records (block).
|
// The buffer holds max records_per_block_ of records (block).
|
||||||
FILE *buffer = fopen(temp_file_name_, "a");
|
std::fstream buffer(temp_file_name_, std::ios::app);
|
||||||
if (!buffer) return -1;
|
if (!buffer) return -1;
|
||||||
if (ftell(buffer) == 0) {
|
if (buffer.tellg() == 0) {
|
||||||
if (pack_tag_and_wiretype(buffer, 1, WIRE_TYPE_VARINT))
|
if (pack_tag_and_wiretype(buffer, 1, WIRE_TYPE_VARINT))
|
||||||
throw std::invalid_argument("Error writting tag and wiretype");
|
throw std::invalid_argument("Error writting tag and wiretype");
|
||||||
if (pack_varint_i32(buffer, 1))
|
if (pack_varint_i32(buffer, 1))
|
||||||
@ -90,7 +88,6 @@ namespace twml {
|
|||||||
throw std::invalid_argument("Error writtig tag and wiretype");
|
throw std::invalid_argument("Error writtig tag and wiretype");
|
||||||
if (pack_string(buffer, record, record_len))
|
if (pack_string(buffer, record, record_len))
|
||||||
throw std::invalid_argument("Error writting record");
|
throw std::invalid_argument("Error writting record");
|
||||||
fclose(buffer);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((record_index_ % records_per_block_) == 0) {
|
if ((record_index_ % records_per_block_) == 0) {
|
||||||
@ -101,24 +98,24 @@ namespace twml {
|
|||||||
|
|
||||||
int BlockFormatWriter::flush() {
|
int BlockFormatWriter::flush() {
|
||||||
// Flush the records in the buffer to outputfile
|
// Flush the records in the buffer to outputfile
|
||||||
FILE *buffer = fopen(temp_file_name_, "r");
|
std::fstream buffer(temp_file_name_, std::ios::in);
|
||||||
if (buffer) {
|
if (buffer) {
|
||||||
fseek(buffer, 0, SEEK_END);
|
buffer.seekg(0, buffer.end);
|
||||||
int64_t block_size = ftell(buffer);
|
int64_t block_size = buffer.tellg();
|
||||||
fseek(buffer, 0, SEEK_SET);
|
buffer.seekg(0, buffer.beg);
|
||||||
|
|
||||||
if (fwrite(_marker, sizeof(_marker), 1, outputfile_) != 1) return 1;
|
if (outputfile_.write(reinterpret_cast<const char*>(_marker), sizeof(_marker)).fail() != 1) return 1;
|
||||||
if (write_int(outputfile_, block_size)) return 1;
|
if (write_int(outputfile_, block_size)) return 1;
|
||||||
uint8_t buff[4096];
|
uint8_t buff[4096];
|
||||||
while (1) {
|
while (1) {
|
||||||
size_t n = fread(buff, 1, sizeof(buff), buffer);
|
buffer.read(reinterpret_cast<char*>(buff), sizeof(buff));
|
||||||
|
std::streamsize n = buffer.gcount();
|
||||||
if (n) {
|
if (n) {
|
||||||
size_t x = fwrite(buff, 1, n, outputfile_);
|
outputfile_.write(reinterpret_cast<char*>(buff), n);
|
||||||
if (x != n) return 1;
|
if (outputfile_.fail()) return 1;
|
||||||
}
|
}
|
||||||
if (n != sizeof(buff)) break;
|
if (n != sizeof(buff)) break;
|
||||||
}
|
}
|
||||||
fclose(buffer);
|
|
||||||
// Remove the buffer
|
// Remove the buffer
|
||||||
if (remove(temp_file_name_)) return 1;
|
if (remove(temp_file_name_)) return 1;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user