mirror of
https://github.com/koreader/koreader
synced 2024-11-10 01:10:34 +00:00
Calibre: More QoL tweaks (#7545)
* Wireless: Optimize memory usage in StreamMessageQueue (use an array of string ropes, that we only concatenate once). Allowed to relax the throttling, making transfers that much faster. * Persist: Add a "zstd" codec, that uses the "luajit" codec, but compressed via zstd. Since both of those are very fast, it pretty much trounces everything in terms of speed and size ;). * Persist: Implemented a "writes_to_file" framework, much like the existing "reads_from_file" one. And use it in the zstd codec to avoid useless temporary string interning. * Metadata: Switch to the zstd codec.
This commit is contained in:
parent
47c59e0e5a
commit
ea3fa5c2c7
2
base
2
base
@ -1 +1 @@
|
|||||||
Subproject commit 52d5705f57249f1a07b15a65bb0fd22fa16f3a2d
|
Subproject commit c9555f7fdda737fd6970425d237174073d67d054
|
@ -1,8 +1,12 @@
|
|||||||
local bitser = require("ffi/bitser")
|
local bitser = require("ffi/bitser")
|
||||||
local buffer = require("string.buffer")
|
local buffer = require("string.buffer")
|
||||||
local dump = require("dump")
|
local dump = require("dump")
|
||||||
|
local ffi = require("ffi")
|
||||||
local lfs = require("libs/libkoreader-lfs")
|
local lfs = require("libs/libkoreader-lfs")
|
||||||
local logger = require("logger")
|
local logger = require("logger")
|
||||||
|
local zstd = require("ffi/zstd")
|
||||||
|
|
||||||
|
local C = ffi.C
|
||||||
|
|
||||||
local function readFile(file, bytes)
|
local function readFile(file, bytes)
|
||||||
local f, str, err
|
local f, str, err
|
||||||
@ -23,6 +27,7 @@ local codecs = {
|
|||||||
bitser = {
|
bitser = {
|
||||||
id = "bitser",
|
id = "bitser",
|
||||||
reads_from_file = false,
|
reads_from_file = false,
|
||||||
|
writes_to_file = false,
|
||||||
|
|
||||||
serialize = function(t)
|
serialize = function(t)
|
||||||
local ok, str = pcall(bitser.dumps, t)
|
local ok, str = pcall(bitser.dumps, t)
|
||||||
@ -45,6 +50,7 @@ local codecs = {
|
|||||||
luajit = {
|
luajit = {
|
||||||
id = "luajit",
|
id = "luajit",
|
||||||
reads_from_file = false,
|
reads_from_file = false,
|
||||||
|
writes_to_file = false,
|
||||||
|
|
||||||
serialize = function(t)
|
serialize = function(t)
|
||||||
local ok, str = pcall(buffer.encode, t)
|
local ok, str = pcall(buffer.encode, t)
|
||||||
@ -62,10 +68,74 @@ local codecs = {
|
|||||||
return t
|
return t
|
||||||
end,
|
end,
|
||||||
},
|
},
|
||||||
-- dump: human readable, pretty printed, fast enough for most user cases.
|
-- zstd: luajit, but compressed w/ zstd ;). Much smaller, at a very small performance cost (decompressing is *fast*).
|
||||||
|
zstd = {
|
||||||
|
id = "zstd",
|
||||||
|
reads_from_file = true,
|
||||||
|
writes_to_file = true,
|
||||||
|
|
||||||
|
serialize = function(t, as_bytecode, path)
|
||||||
|
local ok, str = pcall(buffer.encode, t)
|
||||||
|
if not ok then
|
||||||
|
return nil, "cannot serialize " .. tostring(t) .. " (" .. str .. ")"
|
||||||
|
end
|
||||||
|
|
||||||
|
local cbuff, clen = zstd.zstd_compress(str, #str)
|
||||||
|
|
||||||
|
local f = C.fopen(path, "wb")
|
||||||
|
if f == nil then
|
||||||
|
return nil, "fopen: " .. ffi.string(C.strerror(ffi.errno()))
|
||||||
|
end
|
||||||
|
if C.fwrite(cbuff, 1, clen, f) < clen then
|
||||||
|
C.fclose(f)
|
||||||
|
C.free(cbuff)
|
||||||
|
return nil, "failed to write file"
|
||||||
|
end
|
||||||
|
C.fclose(f)
|
||||||
|
C.free(cbuff)
|
||||||
|
|
||||||
|
return true
|
||||||
|
end,
|
||||||
|
|
||||||
|
deserialize = function(path)
|
||||||
|
local f = C.fopen(path, "rb")
|
||||||
|
if f == nil then
|
||||||
|
return nil, "fopen: " .. ffi.string(C.strerror(ffi.errno()))
|
||||||
|
end
|
||||||
|
local size = lfs.attributes(path, "size")
|
||||||
|
-- NOTE: In a perfect world, we'd just mmap the file.
|
||||||
|
-- But that's problematic on a portability level: while mmap is POSIX, implementations differ,
|
||||||
|
-- and some old platforms don't support mmap-on-vfat (Legacy Kindle) :'(.
|
||||||
|
local data = C.malloc(size)
|
||||||
|
if data == nil then
|
||||||
|
C.fclose(f)
|
||||||
|
return nil, "failed to allocate read buffer"
|
||||||
|
end
|
||||||
|
if C.fread(data, 1, size, f) < size or C.ferror(f) ~= 0 then
|
||||||
|
C.free(data)
|
||||||
|
C.fclose(f)
|
||||||
|
return nil, "failed to read file"
|
||||||
|
end
|
||||||
|
C.fclose(f)
|
||||||
|
|
||||||
|
local buff, ulen = zstd.zstd_uncompress(data, size)
|
||||||
|
C.free(data)
|
||||||
|
|
||||||
|
local str = ffi.string(buff, ulen)
|
||||||
|
C.free(buff)
|
||||||
|
|
||||||
|
local ok, t = pcall(buffer.decode, str)
|
||||||
|
if not ok then
|
||||||
|
return nil, "malformed serialized data (" .. t .. ")"
|
||||||
|
end
|
||||||
|
return t
|
||||||
|
end,
|
||||||
|
},
|
||||||
|
-- dump: human readable, pretty printed, fast enough for most use cases.
|
||||||
dump = {
|
dump = {
|
||||||
id = "dump",
|
id = "dump",
|
||||||
reads_from_file = true,
|
reads_from_file = true,
|
||||||
|
writes_to_file = false,
|
||||||
|
|
||||||
serialize = function(t, as_bytecode)
|
serialize = function(t, as_bytecode)
|
||||||
local content
|
local content
|
||||||
@ -141,17 +211,24 @@ function Persist:load()
|
|||||||
end
|
end
|
||||||
|
|
||||||
function Persist:save(t, as_bytecode)
|
function Persist:save(t, as_bytecode)
|
||||||
local str, file, err
|
if codecs[self.codec].writes_to_file then
|
||||||
str, err = codecs[self.codec].serialize(t, as_bytecode)
|
local ok, err = codecs[self.codec].serialize(t, as_bytecode, self.path)
|
||||||
|
if not ok then
|
||||||
|
return nil, err
|
||||||
|
end
|
||||||
|
else
|
||||||
|
local str, err = codecs[self.codec].serialize(t, as_bytecode)
|
||||||
if not str then
|
if not str then
|
||||||
return nil, err
|
return nil, err
|
||||||
end
|
end
|
||||||
|
local file
|
||||||
file, err = io.open(self.path, "wb")
|
file, err = io.open(self.path, "wb")
|
||||||
if not file then
|
if not file then
|
||||||
return nil, err
|
return nil, err
|
||||||
end
|
end
|
||||||
file:write(str)
|
file:write(str)
|
||||||
file:close()
|
file:close()
|
||||||
|
end
|
||||||
return true
|
return true
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -203,7 +203,7 @@ if last_migration_date < 20210414 then
|
|||||||
logger.info("Performing one-time migration for 20210414")
|
logger.info("Performing one-time migration for 20210414")
|
||||||
|
|
||||||
local cache_path = DataStorage:getDataDir() .. "/cache/calibre"
|
local cache_path = DataStorage:getDataDir() .. "/cache/calibre"
|
||||||
ok, err = os.remove(cache_path .. "/books.dat")
|
local ok, err = os.remove(cache_path .. "/books.dat")
|
||||||
if not ok then
|
if not ok then
|
||||||
logger.warn("os.remove:", err)
|
logger.warn("os.remove:", err)
|
||||||
end
|
end
|
||||||
|
@ -56,13 +56,18 @@ function StreamMessageQueue:handleZframe(frame)
|
|||||||
end
|
end
|
||||||
|
|
||||||
function StreamMessageQueue:waitEvent()
|
function StreamMessageQueue:waitEvent()
|
||||||
local data = ""
|
|
||||||
-- Successive zframes may come in batches of tens or hundreds in some cases.
|
-- Successive zframes may come in batches of tens or hundreds in some cases.
|
||||||
-- If they are concatenated in a single loop, it may consume a significant amount
|
-- Since we buffer each frame's data in a Lua string,
|
||||||
-- of memory. And it's fairly easy to trigger when receiving file data from Calibre.
|
-- and then let the caller concatenate those,
|
||||||
-- So, throttle reception to 10 packages at most in one waitEvent loop,
|
-- it may consume a significant amount of memory.
|
||||||
|
-- And it's fairly easy to trigger when receiving file data from Calibre.
|
||||||
|
-- So, throttle reception to 256 packages at most in one waitEvent loop,
|
||||||
-- after which we immediately call receiveCallback.
|
-- after which we immediately call receiveCallback.
|
||||||
local wait_packages = 10
|
local wait_packages = 256
|
||||||
|
-- In a similar spirit, much like LuaSocket,
|
||||||
|
-- we store the data as ropes of strings in an array,
|
||||||
|
-- to be concatenated by the caller.
|
||||||
|
local t = {}
|
||||||
while czmq.zpoller_wait(self.poller, 0) ~= nil and wait_packages > 0 do
|
while czmq.zpoller_wait(self.poller, 0) ~= nil and wait_packages > 0 do
|
||||||
local id_frame = czmq.zframe_recv(self.socket)
|
local id_frame = czmq.zframe_recv(self.socket)
|
||||||
if id_frame ~= nil then
|
if id_frame ~= nil then
|
||||||
@ -70,12 +75,15 @@ function StreamMessageQueue:waitEvent()
|
|||||||
end
|
end
|
||||||
local frame = czmq.zframe_recv(self.socket)
|
local frame = czmq.zframe_recv(self.socket)
|
||||||
if frame ~= nil then
|
if frame ~= nil then
|
||||||
data = data .. (self:handleZframe(frame) or "")
|
local data = self:handleZframe(frame)
|
||||||
|
if data then
|
||||||
|
table.insert(t, data)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
wait_packages = wait_packages - 1
|
wait_packages = wait_packages - 1
|
||||||
end
|
end
|
||||||
if self.receiveCallback and data ~= "" then
|
if self.receiveCallback and #t ~= 0 then
|
||||||
self.receiveCallback(data)
|
self.receiveCallback(t)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -176,7 +176,7 @@ local CalibreSearch = InputContainer:new{
|
|||||||
},
|
},
|
||||||
cache_books = Persist:new{
|
cache_books = Persist:new{
|
||||||
path = DataStorage:getDataDir() .. "/cache/calibre/books.dat",
|
path = DataStorage:getDataDir() .. "/cache/calibre/books.dat",
|
||||||
codec = "luajit",
|
codec = "zstd",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -132,7 +132,8 @@ function CalibreWireless:JSONReceiveCallback(host, port)
|
|||||||
-- NOTE: Closure trickery because we need a reference to *this* self *inside* the callback,
|
-- NOTE: Closure trickery because we need a reference to *this* self *inside* the callback,
|
||||||
-- which will be called as a function from another object (namely, StreamMessageQueue).
|
-- which will be called as a function from another object (namely, StreamMessageQueue).
|
||||||
local this = self
|
local this = self
|
||||||
return function(data)
|
return function(t)
|
||||||
|
local data = table.concat(t)
|
||||||
this:onReceiveJSON(data)
|
this:onReceiveJSON(data)
|
||||||
if not this.connect_message then
|
if not this.connect_message then
|
||||||
this.password_check_callback = function()
|
this.password_check_callback = function()
|
||||||
@ -546,7 +547,8 @@ function CalibreWireless:sendBook(arg)
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
-- switching to raw data receiving mode
|
-- switching to raw data receiving mode
|
||||||
self.calibre_socket.receiveCallback = function(data)
|
self.calibre_socket.receiveCallback = function(t)
|
||||||
|
local data = table.concat(t)
|
||||||
--logger.info("receive file data", #data)
|
--logger.info("receive file data", #data)
|
||||||
--logger.info("Memory usage KB:", collectgarbage("count"))
|
--logger.info("Memory usage KB:", collectgarbage("count"))
|
||||||
local to_write_data = data:sub(1, to_write_bytes)
|
local to_write_data = data:sub(1, to_write_bytes)
|
||||||
|
Loading…
Reference in New Issue
Block a user