mirror of
https://github.com/koreader/koreader
synced 2024-11-10 01:10:34 +00:00
czmq & libzmq: update to 4.2.1 & 4.3.5 respectively (#12350)
This commit is contained in:
parent
6de2a59bd8
commit
c2d58e525e
2
base
2
base
@ -1 +1 @@
|
||||
Subproject commit d4e4cae2ec1d89d02ce9036a5404c8f8dc8baf50
|
||||
Subproject commit 99ddf05e3657c9f6c31d53a05023a9d598e8d8ec
|
@ -2,7 +2,7 @@ local ffi = require("ffi")
|
||||
local logger = require("logger")
|
||||
local MessageQueue = require("ui/message/messagequeue")
|
||||
local _ = require("ffi/zeromq_h")
|
||||
local czmq = ffi.load("libs/libczmq.so.1")
|
||||
local czmq = ffi.load("libs/libczmq.so.4")
|
||||
local filemq = ffi.load("libs/libfmq.so.1")
|
||||
|
||||
local FileMessageQueue = MessageQueue:extend{
|
||||
|
@ -3,7 +3,7 @@ local Event = require("ui/event")
|
||||
local logger = require("logger")
|
||||
|
||||
local _ = require("ffi/zeromq_h")
|
||||
local czmq = ffi.load("libs/libczmq.so.1")
|
||||
local czmq = ffi.load("libs/libczmq.so.4")
|
||||
|
||||
local MessageQueue = {}
|
||||
|
||||
|
@ -3,8 +3,8 @@ local logger = require("logger")
|
||||
local MessageQueue = require("ui/message/messagequeue")
|
||||
|
||||
local _ = require("ffi/zeromq_h")
|
||||
local zmq = ffi.load("libs/libzmq.so.4")
|
||||
local czmq = ffi.load("libs/libczmq.so.1")
|
||||
local zmq = ffi.load("libs/libzmq.so.5")
|
||||
local czmq = ffi.load("libs/libczmq.so.4")
|
||||
local C = ffi.C
|
||||
|
||||
local StreamMessageQueue = MessageQueue:extend{
|
||||
@ -13,21 +13,25 @@ local StreamMessageQueue = MessageQueue:extend{
|
||||
}
|
||||
|
||||
function StreamMessageQueue:start()
|
||||
self.context = czmq.zctx_new()
|
||||
self.socket = czmq.zsocket_new(self.context, C.ZMQ_STREAM)
|
||||
self.poller = czmq.zpoller_new(self.socket, nil)
|
||||
local endpoint = string.format("tcp://%s:%d", self.host, self.port)
|
||||
self.socket = czmq.zsock_new(C.ZMQ_STREAM)
|
||||
if not self.socket then
|
||||
error("cannot create socket for endpoint " .. endpoint)
|
||||
end
|
||||
logger.dbg("connecting to endpoint", endpoint)
|
||||
local rc = czmq.zsocket_connect(self.socket, endpoint)
|
||||
if rc ~= 0 then
|
||||
if czmq.zsock_connect(self.socket, endpoint) ~= 0 then
|
||||
error("cannot connect to " .. endpoint)
|
||||
end
|
||||
local id_size = ffi.new("size_t[1]", 256)
|
||||
local id_size = ffi.new("size_t[1]", 255)
|
||||
local buffer = ffi.new("uint8_t[?]", id_size[0])
|
||||
--- @todo: Check return of zmq_getsockopt()
|
||||
zmq.zmq_getsockopt(self.socket, C.ZMQ_IDENTITY, buffer, id_size)
|
||||
if zmq.zmq_getsockopt(czmq.zsock_resolve(self.socket), C.ZMQ_IDENTITY, buffer, id_size) ~= 0 then
|
||||
error("cannot get socket identity for endpoint " .. endpoint)
|
||||
end
|
||||
self.id = ffi.string(buffer, id_size[0])
|
||||
logger.dbg("id", #self.id, self.id)
|
||||
self.poller = czmq.zpoller_new(self.socket, nil)
|
||||
if not self.poller then
|
||||
error("cannot create poller for endpoint " .. endpoint)
|
||||
end
|
||||
end
|
||||
|
||||
function StreamMessageQueue:stop()
|
||||
@ -35,10 +39,7 @@ function StreamMessageQueue:stop()
|
||||
czmq.zpoller_destroy(ffi.new('zpoller_t *[1]', self.poller))
|
||||
end
|
||||
if self.socket ~= nil then
|
||||
czmq.zsocket_destroy(self.context, self.socket)
|
||||
end
|
||||
if self.context ~= nil then
|
||||
czmq.zctx_destroy(ffi.new('zctx_t *[1]', self.context))
|
||||
czmq.zsock_destroy(ffi.new('zsock_t *[1]', self.socket))
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -3,7 +3,8 @@ local logger = require("logger")
|
||||
local MessageQueue = require("ui/message/messagequeue")
|
||||
|
||||
local _ = require("ffi/zeromq_h")
|
||||
local czmq = ffi.load("libs/libczmq.so.1")
|
||||
local zmq = ffi.load("libs/libzmq.so.5")
|
||||
local czmq = ffi.load("libs/libczmq.so.4")
|
||||
local C = ffi.C
|
||||
|
||||
local StreamMessageQueueServer = MessageQueue:extend{
|
||||
@ -12,15 +13,18 @@ local StreamMessageQueueServer = MessageQueue:extend{
|
||||
}
|
||||
|
||||
function StreamMessageQueueServer:start()
|
||||
self.context = czmq.zctx_new()
|
||||
self.socket = czmq.zsocket_new(self.context, C.ZMQ_STREAM)
|
||||
self.poller = czmq.zpoller_new(self.socket, nil)
|
||||
local endpoint = string.format("tcp://%s:%d", self.host, self.port)
|
||||
logger.dbg("StreamMessageQueueServer: Binding to endpoint", endpoint)
|
||||
local rc = czmq.zsocket_bind(self.socket, endpoint)
|
||||
-- If success, rc is port number
|
||||
if rc == -1 then
|
||||
logger.err("StreamMessageQueueServer: Cannot bind to ", endpoint)
|
||||
self.socket = czmq.zsock_new(C.ZMQ_STREAM)
|
||||
if not self.socket then
|
||||
error("cannot create socket for endpoint " .. endpoint)
|
||||
end
|
||||
logger.dbg("binding to endpoint", endpoint)
|
||||
if czmq.zsock_bind(self.socket, endpoint) == -1 then
|
||||
error("cannot bind to " .. endpoint)
|
||||
end
|
||||
self.poller = czmq.zpoller_new(self.socket, nil)
|
||||
if not self.poller then
|
||||
error("cannot create poller for endpoint " .. endpoint)
|
||||
end
|
||||
end
|
||||
|
||||
@ -29,10 +33,7 @@ function StreamMessageQueueServer:stop()
|
||||
czmq.zpoller_destroy(ffi.new('zpoller_t *[1]', self.poller))
|
||||
end
|
||||
if self.socket ~= nil then
|
||||
czmq.zsocket_destroy(self.context, self.socket)
|
||||
end
|
||||
if self.context ~= nil then
|
||||
czmq.zctx_destroy(ffi.new('zctx_t *[1]', self.context))
|
||||
czmq.zsock_destroy(ffi.new('zsock_t *[1]', self.socket))
|
||||
end
|
||||
end
|
||||
|
||||
@ -74,13 +75,13 @@ end
|
||||
|
||||
function StreamMessageQueueServer:send(data, id_frame)
|
||||
czmq.zframe_send(ffi.new('zframe_t *[1]', id_frame), self.socket, C.ZFRAME_MORE + C.ZFRAME_REUSE)
|
||||
czmq.zmq_send(self.socket, ffi.cast("unsigned char*", data), #data, C.ZFRAME_MORE)
|
||||
zmq.zmq_send(czmq.zsock_resolve(self.socket), ffi.cast("unsigned char*", data), #data, C.ZFRAME_MORE)
|
||||
-- Note: We can't use czmq.zstr_send(self.socket, data), which would stop on the first
|
||||
-- null byte in data (Lua strings can have null bytes inside).
|
||||
|
||||
-- Close connection
|
||||
czmq.zframe_send(ffi.new('zframe_t *[1]', id_frame), self.socket, C.ZFRAME_MORE)
|
||||
czmq.zmq_send(self.socket, nil, 0, 0)
|
||||
zmq.zmq_send(czmq.zsock_resolve(self.socket), nil, 0, 0)
|
||||
end
|
||||
|
||||
return StreamMessageQueueServer
|
||||
|
Loading…
Reference in New Issue
Block a user