mirror of
https://github.com/twitter/the-algorithm.git
synced 2024-11-10 21:55:10 +01:00
fix(twml): implement bwedding's fixes
Signed-off-by: Elijah Conners <business@elijahpepe.com>
This commit is contained in:
parent
696f03f220
commit
1020c610ef
@ -33,7 +33,7 @@ namespace twml {
|
||||
int record_index_;
|
||||
int records_per_block_;
|
||||
|
||||
int pack_tag_and_wiretype(std::fstream& fs, uint32_t tag, uint32_t wiretype);
|
||||
int pack_tag_and_wiretype(std::fstream& fs, std::uint32_t tag, std::uint32_t wiretype);
|
||||
int pack_varint_i32(std::fstream& fs, int value);
|
||||
int pack_string(std::fstream& fs, const char *in, size_t in_len);
|
||||
int write_int(std::fstream& fs, int value);
|
||||
|
@ -11,7 +11,7 @@
|
||||
#endif
|
||||
|
||||
#define MARKER_SIZE (16)
|
||||
static uint8_t _marker[MARKER_SIZE] = {
|
||||
static std::uint8_t _marker[MARKER_SIZE] = {
|
||||
0x29, 0xd8, 0xd5, 0x06, 0x58, 0xcd, 0x4c, 0x29,
|
||||
0xb2, 0xbc, 0x57, 0x99, 0x21, 0x71, 0xbd, 0xff
|
||||
};
|
||||
@ -21,10 +21,13 @@ namespace twml {
|
||||
file_name_(file_name), record_index_(0), records_per_block_(record_per_block) {
|
||||
snprintf(temp_file_name_, PATH_MAX, "%s.block", file_name);
|
||||
std::fstream outputfile_(file_name_, std::ios::app);
|
||||
if (!outputfile_.is_open()) {
|
||||
throw std::runtime_error("Failed to open the output file");
|
||||
}
|
||||
}
|
||||
|
||||
int BlockFormatWriter::pack_tag_and_wiretype(std::fstream& fs, uint32_t tag, uint32_t wiretype) {
|
||||
uint8_t x = ((tag & 0x0f) << 3) | (wiretype & 0x7);
|
||||
int BlockFormatWriter::pack_tag_and_wiretype(std::fstream& fs, std::uint32_t tag, std::uint32_t wiretype) {
|
||||
std::uint8_t x = ((tag & 0x0f) << 3) | (wiretype & 0x7);
|
||||
fs.write(reinterpret_cast<char*>(&x), sizeof(x));
|
||||
if (fs.fail()) {
|
||||
return -1;
|
||||
@ -34,10 +37,10 @@ namespace twml {
|
||||
|
||||
int BlockFormatWriter::pack_varint_i32(std::fstream& fs, int value) {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
uint8_t x = value & 0x7F;
|
||||
std::uint8_t x = value & 0x7F;
|
||||
value = value >> 7;
|
||||
if (value != 0) x |= 0x80;
|
||||
fs.write(reinterpret_cast<char*>(&x), sizeof(x));
|
||||
fs.write(reinterpret_cast<const char*>(&x), sizeof(x));
|
||||
if (fs.fail()) {
|
||||
return -1;
|
||||
}
|
||||
@ -56,11 +59,11 @@ namespace twml {
|
||||
}
|
||||
|
||||
int BlockFormatWriter::write_int(std::fstream& fs, int value) {
|
||||
uint8_t buff[4];
|
||||
buff[0] = value & 0xff;
|
||||
buff[1] = (value >> 8) & 0xff;
|
||||
buff[2] = (value >> 16) & 0xff;
|
||||
buff[3] = (value >> 24) & 0xff;
|
||||
std::uint8_t buff[4];
|
||||
buff[0] = static_cast<std::uint8_t>(value & 0xff);
|
||||
buff[1] = static_cast<std::uint8_t>((value >> 8) & 0xff);
|
||||
buff[2] = static_cast<std::uint8_t>((value >> 16) & 0xff);
|
||||
buff[3] = static_cast<std::uint8_t>((value >> 24) & 0xff);
|
||||
fs.write(reinterpret_cast<char*>(buff), sizeof(buff));
|
||||
if (fs.fail()) {
|
||||
return -1;
|
||||
@ -73,7 +76,7 @@ namespace twml {
|
||||
record_index_++;
|
||||
// The buffer holds max records_per_block_ of records (block).
|
||||
std::fstream buffer(temp_file_name_, std::ios::app);
|
||||
if (!buffer) return -1;
|
||||
if (!buffer.is_open()) return -1;
|
||||
if (buffer.tellg() == 0) {
|
||||
if (pack_tag_and_wiretype(buffer, 1, WIRE_TYPE_VARINT))
|
||||
throw std::invalid_argument("Error writting tag and wiretype");
|
||||
@ -106,12 +109,12 @@ namespace twml {
|
||||
|
||||
if (outputfile_.write(reinterpret_cast<const char*>(_marker), sizeof(_marker)).fail() != 1) return 1;
|
||||
if (write_int(outputfile_, block_size)) return 1;
|
||||
uint8_t buff[4096];
|
||||
std::uint8_t buff[4096];
|
||||
while (1) {
|
||||
buffer.read(reinterpret_cast<char*>(buff), sizeof(buff));
|
||||
std::streamsize n = buffer.gcount();
|
||||
if (n) {
|
||||
outputfile_.write(reinterpret_cast<char*>(buff), n);
|
||||
outputfile_.write(reinterpret_cast<const char*>(buff), n);
|
||||
if (outputfile_.fail()) return 1;
|
||||
}
|
||||
if (n != sizeof(buff)) break;
|
||||
|
Loading…
Reference in New Issue
Block a user