Remove unused/obsolete plugins code and mentions

Signed-off-by: Manos Pitsidianakis <manos@pitsidianak.is>
pull/482/head
Manos Pitsidianakis 1 month ago
parent c051190114
commit e9a72072bf
No known key found for this signature in database
GPG Key ID: 7729C7707F7E09D0

@ -649,7 +649,7 @@ catchall for general errors
process panic
.El
.Sh ENVIRONMENT
.Bl -tag -width "$XDG_CONFIG_HOME/meli/plugins/*" -offset indent
.Bl -tag -width "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" -offset indent
.It Ev EDITOR
Specifies the editor to use
.It Ev MELI_CONFIG
@ -665,7 +665,7 @@ overrides this.
.Sh FILES
.Nm
uses the following parts of the XDG standard:
.Bl -tag -width "$XDG_CONFIG_HOME/meli/plugins/*" -offset indent
.Bl -tag -width "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" -offset indent
.It Ev XDG_CONFIG_HOME
defaults to
.Pa ~/.config/
@ -675,17 +675,13 @@ defaults to
.El
.Pp
and appropriates the following locations:
.Bl -tag -width "$XDG_CONFIG_HOME/meli/plugins/*" -offset indent
.Bl -tag -width "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" -offset indent
.It Pa $XDG_CONFIG_HOME/meli/
User configuration directory
.It Pa $XDG_CONFIG_HOME/meli/config.toml
User configuration file, see
.Xr meli.conf 5
for its syntax and values.
.It Pa $XDG_CONFIG_HOME/meli/hooks/*
Reserved for event hooks.
.It Pa $XDG_CONFIG_HOME/meli/plugins/*
Reserved for plugin files.
.It Pa $XDG_CACHE_HOME/meli/*
Internal cached data used by meli.
.It Pa $XDG_DATA_HOME/meli/*

@ -1,303 +0,0 @@
/*
* meli - ui plugins
*
* Copyright 2019 Manos Pitsidianakis
*
* This file is part of meli.
*
* meli is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* meli is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
//! Plugins are executed by meli and communication is done by `messagepack` IPC.
use melib::error::{Error, Result};
use std::collections::HashMap;
use std::io::Write;
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::PathBuf;
use std::process::Stdio;
use uuid::Uuid;
//pub mod backend;
pub mod rpc;
pub use rpc::*;
pub const BACKEND_FN: i8 = 0;
pub const BACKEND_OP_FN: i8 = 1;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum PluginKind {
LongLived,
Filter,
Backend,
}
impl Default for PluginKind {
fn default() -> Self {
Self::LongLived
}
}
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub struct Plugin {
kind: PluginKind,
executable: String,
name: String,
#[serde(default)]
hooks: Vec<String>,
}
impl Plugin {
pub fn kind(&self) -> PluginKind {
self.kind
}
}
#[derive(Debug)]
pub struct PluginManager {
plugins: HashMap<String, Plugin>,
sessions: HashMap<Uuid, String>,
instances: HashMap<Uuid, std::process::Child>,
streams: HashMap<Uuid, RpcChannel>,
hooks: HashMap<String, UIHook>,
listener: UnixListener,
}
fn socket_path() -> PathBuf {
xdg::BaseDirectories::new()
.and_then(|base_dirs| {
base_dirs
.place_runtime_file("meli-plugins")
.or_else(|_| base_dirs.place_cache_file("meli-plugins"))
.or_else(|_| {
let mut p = base_dirs.get_cache_home();
p.push("meli-plugins");
Ok(p)
})
})
.unwrap_or_else(|_| PathBuf::from("."))
}
impl Drop for PluginManager {
fn drop(&mut self) {
let _ = std::fs::remove_file(&socket_path());
for (k, c) in self.instances.iter_mut() {
if let Err(err) = debug!(c.kill()) {
eprintln!(
"Error: could not kill process {} spawned by plugin {} ({})",
c.id(),
&self.plugins[&self.sessions[k]].name,
err
);
}
}
}
}
impl PluginManager {
pub fn new() -> Self {
let socket_path = socket_path();
let _ = std::fs::remove_file(&socket_path);
let listener = UnixListener::bind(&socket_path).unwrap();
/*
debug!("bound");
// accept connections and process them, spawning a new thread for each one
thread::spawn(move || {
debug!("spawn");
let stream = listener.accept();
debug!("socket stream {:?}", &stream);
match stream {
Ok((mut stream, _)) => {
debug!("socket stream {:?}", &stream);
/* connection succeeded */
thread::spawn(move || {
debug!("socket listen {:?}", &stream);
debug!(initialize(stream));
//let mut response = Vec::new();
//debug!(stream.read_to_end(&mut response));
//loop {
// debug!("pre-flush 1");
// stream.flush();
// debug!("post-flush 1");
// if debug!(rmpv::decode::value::read_value(&mut stream)).is_err() {
// return;
// }
// debug!("post-read_value");
// //debug!("socket response {}", unsafe {
// // String::from_utf8_lossy(&response)
// //});
// stream.flush();
// debug!("post-flush 2");
// if debug!(rmpv::encode::write_value(
// &mut stream,
// &rmpv::Value::String("hello 2 u 2".into())
// ))
// .is_err()
// {
// return;
// }
// debug!("post-write_value");
//}
});
}
Err(err) => {
/* connection failed */
debug!(err);
}
}
});
*/
let mut hooks: HashMap<String, UIHook> = Default::default();
hooks.insert(
"attachment-view".to_string(),
UIHook {
name: "attachment-view".to_string(),
wait_response: true,
listeners: Vec::new(),
},
);
hooks.insert(
"refresh-account".to_string(),
UIHook {
name: "refresh-account".to_string(),
wait_response: false,
listeners: Vec::new(),
},
);
PluginManager {
plugins: Default::default(),
sessions: Default::default(),
instances: Default::default(),
streams: Default::default(),
hooks,
listener,
}
}
pub fn register(&mut self, plugin: Plugin) -> Result<()> {
debug!(&plugin);
match plugin.kind {
PluginKind::LongLived => {
/* spawn thread */
let inv = &plugin.executable;
let child = std::process::Command::new("sh")
.args(&["-c", inv])
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()?;
let (stream, _) = self.listener.accept()?;
/* send init message to plugin to register hooks */
let session = Uuid::new_v4();
let channel = RpcChannel::new(stream, &session)?;
for h in &plugin.hooks {
self.add_listener(h, session);
}
self.instances.insert(session, child);
self.sessions.insert(session, plugin.name.clone());
self.streams.insert(session, channel);
self.plugins.insert(plugin.name.clone(), plugin);
Ok(())
}
PluginKind::Filter => {
let session = Uuid::new_v4();
for h in &plugin.hooks {
self.add_listener(h, session);
}
self.sessions.insert(session, plugin.name.clone());
self.plugins.insert(plugin.name.clone(), plugin);
/* send init message to plugin to register hooks */
Ok(())
}
PluginKind::Backend => {
self.plugins.insert(plugin.name.clone(), plugin);
/* send init message to plugin to register hooks */
Ok(())
}
}
}
pub fn register_hook(&mut self, hook: UIHook) {
self.hooks.insert(hook.name.clone(), hook);
}
pub fn add_listener(&mut self, hook: &str, session: Uuid) {
self.hooks
.entry(hook.to_string())
.and_modify(|entry| entry.listeners.push(session));
}
pub fn activate_hook(&mut self, hook: &str, bytes: Vec<u8>) -> Result<FilterResult> {
debug!("activate_hook {}", hook);
debug!("bytes {:?}", &bytes);
for l in &self.hooks[hook].listeners {
let plugin = &self.plugins[&self.sessions[l]];
debug!(&plugin);
match &plugin.kind {
PluginKind::LongLived => {
debug!("listener: {}", l);
let channel = self.streams.get_mut(l).unwrap();
channel.write_ref(&rmpv::ValueRef::Binary(bytes.as_slice()))?;
let reply: Result<FilterResult> = channel.from_read();
return reply;
}
PluginKind::Filter => {
let inv = &plugin.executable;
let mut child = std::process::Command::new("sh")
.args(&["-c", inv])
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()?;
let (stream, _) = self.listener.accept()?;
let mut channel = RpcChannel::new(stream, l)?;
channel.write_ref(&rmpv::ValueRef::Binary(bytes.as_slice()))?;
let reply: Result<FilterResult> = channel.from_read();
child.kill()?;
return reply;
}
k => {
debug!("got plugin kind {:?} in hook {}", k, hook);
}
}
}
Err(Error::new("no listeners for this hook"))
}
pub fn listener(&self) -> UnixListener {
self.listener.try_clone().unwrap()
}
}
#[derive(Debug)]
pub struct UIHook {
name: String,
wait_response: bool,
listeners: Vec<Uuid>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[serde(tag = "t", content = "c")]
pub enum FilterResult {
UiMessage(String),
Text(String),
Ansi(String),
Binary(Vec<u8>),
Error(String),
}

@ -1,303 +0,0 @@
/*
* meli - plugins
*
* Copyright 2019 Manos Pitsidianakis
*
* This file is part of meli.
*
* meli is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* meli is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
use super::*;
use melib::async_workers::{Async, AsyncBuilder, AsyncStatus, WorkContext};
use melib::backends::*;
use melib::conf::AccountSettings;
use melib::email::{Envelope, EnvelopeHash, Flag};
use melib::error::{Error, Result};
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock};
// fields/interface/deserializing
#[derive(Debug, Clone, Serialize, Deserialize)]
struct SimpleEnvelope {
hash: EnvelopeHash,
subject: String,
from: String,
to: String,
date: String,
message_id: String,
references: String,
}
#[derive(Debug)]
pub struct PluginBackend {
plugin: Plugin,
child: std::process::Child,
channel: Arc<Mutex<RpcChannel>>,
collection: melib::Collection,
is_online: Arc<Mutex<(std::time::Instant, Result<()>)>>,
}
impl Drop for PluginBackend {
fn drop(&mut self) {
if let Err(err) = debug!(self.child.kill()) {
eprintln!(
"Error: could not kill process {} spawned by plugin {} ({})",
self.child.id(),
&self.plugin.name,
err
);
}
}
}
impl MailBackend for PluginBackend {
fn capabilities(&self) -> MailBackendCapabilities {
const CAPABILITIES: MailBackendCapabilities = MailBackendCapabilities {
is_async: false,
is_remote: false,
supports_search: false,
extensions: None,
supports_tags: false,
supports_submission: false,
};
CAPABILITIES
}
fn is_online(&self) -> Result<()> {
if let Ok(mut is_online) = self.is_online.try_lock() {
let now = std::time::Instant::now();
if now.duration_since(is_online.0) >= std::time::Duration::new(2, 0) {
if let Ok(mut channel) = self.channel.try_lock() {
channel.write_ref(&rmpv::ValueRef::Ext(BACKEND_FN, b"is_online"))?;
debug!(channel.expect_ack())?;
let ret: PluginResult<()> = debug!(channel.from_read())?;
is_online.0 = now;
is_online.1 = ret.into();
}
}
is_online.1.clone()
} else {
Err(Error::new("busy"))
}
}
fn fetch(&mut self, _mailbox_hash: MailboxHash) -> Result<Async<Result<Vec<Envelope>>>> {
let mut w = AsyncBuilder::new();
let channel = self.channel.clone();
let handle = {
let tx = w.tx();
let closure = move |_work_context| {
let mut channel = channel.lock().unwrap();
channel
.write_ref(&rmpv::ValueRef::Ext(BACKEND_FN, b"get"))
.unwrap();
channel.expect_ack().unwrap();
loop {
let read_val: Result<PluginResult<Option<Vec<SimpleEnvelope>>>> =
channel.from_read();
match read_val.map(Into::into).and_then(std::convert::identity) {
Ok(Some(a)) => {
tx.send(AsyncStatus::Payload(Ok(a
.into_iter()
.filter_map(
|SimpleEnvelope {
hash,
date,
from,
to,
subject,
message_id,
references,
}| {
let mut env = melib::Envelope::new(hash);
env.set_date(date.as_bytes());
if let Ok(d) =
melib::email::parser::generic::date(date.as_bytes())
{
env.set_datetime(d);
}
env.set_message_id(message_id.as_bytes());
let parse_result =
melib::email::parser::address::rfc2822address_list(
from.as_bytes(),
);
if parse_result.is_ok() {
let value = parse_result.unwrap().1;
env.set_from(value);
}
let parse_result =
melib::email::parser::address::rfc2822address_list(
to.as_bytes(),
);
if parse_result.is_ok() {
let value = parse_result.unwrap().1;
env.set_to(value);
}
let parse_result = melib::email::parser::encodings::phrase(
subject.as_bytes(),
false,
);
if parse_result.is_ok() {
let value = parse_result.unwrap().1;
env.set_subject(value);
}
if !references.is_empty() {
env.set_references(references.as_bytes());
}
Some(env)
},
)
.collect::<Vec<Envelope>>())))
.unwrap();
}
Ok(None) => {
tx.send(AsyncStatus::Finished).unwrap();
return;
}
Err(err) => {
tx.send(AsyncStatus::Payload(Err(err))).unwrap();
tx.send(AsyncStatus::Finished).unwrap();
return;
}
};
}
};
Box::new(closure)
};
Ok(w.build(handle))
}
fn mailboxes(&self) -> ResultFuture<HashMap<MailboxHash, Mailbox>> {
let mut ret: HashMap<MailboxHash, Mailbox> = Default::default();
ret.insert(0, Mailbox::default());
Ok(Box::pin(async { Ok(ret) }))
}
fn operation(&self, hash: EnvelopeHash) -> Result<Box<dyn BackendOp>> {
Ok(Box::new(PluginOp {
hash,
channel: self.channel.clone(),
bytes: None,
}))
}
fn save(
&self,
_bytes: Vec<u8>,
_mailbox_hash: MailboxHash,
_flags: Option<Flag>,
) -> ResultFuture<()> {
Err(Error::new("Saving is currently unimplemented for plugins"))
}
fn create_mailbox(
&mut self,
_name: String,
) -> ResultFuture<(MailboxHash, HashMap<MailboxHash, Mailbox>)> {
Err(Error::new(
"Creating a mailbox is currently unimplemented for plugins",
))
}
fn collection(&self) -> melib::Collection {
self.collection.clone()
}
fn as_any(&self) -> &dyn ::std::any::Any {
self
}
}
impl PluginBackend {
pub fn new(
listener: UnixListener,
plugin: Plugin,
_s: &AccountSettings,
_is_subscribed: Box<dyn Fn(&str) -> bool>,
_ev: melib::backends::BackendEventConsumer,
) -> Result<Box<dyn MailBackend>> {
if plugin.kind != PluginKind::Backend {
return Err(Error::new(format!(
"Error: Plugin `{}` is not a mail backend plugin, it's `{:?}`",
&plugin.name, &plugin.kind
)));
}
let inv = &plugin.executable;
let child = std::process::Command::new("sh")
.args(&["-c", inv])
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()?;
let (stream, _) = listener.accept()?;
/* send init message to plugin to register hooks */
let session = Uuid::new_v4();
let channel = RpcChannel::new(stream, &session)?;
let now = std::time::Instant::now() - std::time::Duration::from_secs(5);
Ok(Box::new(PluginBackend {
child,
plugin,
channel: Arc::new(Mutex::new(channel)),
collection: Default::default(),
is_online: Arc::new(Mutex::new((now, Err(Error::new("Uninitialized"))))),
}))
}
pub fn register(listener: UnixListener, plugin: Plugin, backends: &mut Backends) {
backends.register(
plugin.name.clone(),
Backend {
create_fn: Box::new(move || {
let plugin = plugin.clone();
let listener = listener.try_clone().unwrap();
Box::new(move |f, i, ev| {
let plugin = plugin.clone();
let listener = listener.try_clone().unwrap();
PluginBackend::new(listener, plugin, f, i, ev)
})
}),
validate_conf_fn: Box::new(|_| Ok(())),
},
);
}
}
#[derive(Debug)]
struct PluginOp {
hash: EnvelopeHash,
channel: Arc<Mutex<RpcChannel>>,
bytes: Option<String>,
}
impl BackendOp for PluginOp {
fn as_bytes(&self) -> ResultFuture<Vec<u8>> {
let hash = self.hash;
let channel = self.channel.clone();
Ok(Box::pin(async move {
if let Ok(mut channel) = channel.try_lock() {
channel.write_ref(&rmpv::ValueRef::Ext(BACKEND_OP_FN, b"as_bytes"))?;
debug!(channel.expect_ack())?;
channel.write_ref(&rmpv::ValueRef::Integer(hash.into()))?;
debug!(channel.expect_ack())?;
let bytes: Result<PluginResult<String>> = channel.from_read();
Ok(bytes
.map(Into::into)
.and_then(std::convert::identity)?
.into_bytes())
} else {
Err(Error::new("busy"))
}
}))
}
}

@ -1,48 +0,0 @@
#! /usr/bin/env python3
"""
meli - sample plugin
Copyright 2019 Manos Pitsidianakis
This file is part of meli.
meli is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
meli is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with meli. If not, see <http://www.gnu.org/licenses/>.
"""
import sys
import subprocess
print(sys.path, file=sys.stderr)
from libmeliapi import Client
if __name__ == "__main__":
server_address = './soworkfile'
client = Client(server_address)
client.connect()
try:
_bytes = client.read()
print('got bytes {!r}'.format(_bytes),file=sys.stderr, )
# run() returns a CompletedProcess object if it was successful
# errors in the created process are raised here too
process = subprocess.run(['tiv','-w', '120','-h', '40', _bytes[0]], check=True, stdout=subprocess.PIPE, universal_newlines=True)
output = process.stdout
print('tiv output len {}'.format(len(output)),file=sys.stderr, )
#print('tiv output bytes {!r}'.format(output),file=sys.stderr, )
message = { "t": "ansi", "c": output }
#print('sending {!r}'.format(message),file=sys.stderr, )
print('returned :', client.send(message), file=sys.stderr,)
except Exception as msg:
print(msg, file=sys.stderr,)

@ -1,179 +0,0 @@
"""
meli - python3 api plugin
Copyright 2019 Manos Pitsidianakis
This file is part of meli.
meli is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
meli is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with meli. If not, see <http://www.gnu.org/licenses/>.
"""
from collections import deque
import errno
import json
import msgpack
import socket
import struct
import sys
import time
class IPCError(Exception):
pass
class UnknownMessageClass(IPCError):
pass
class InvalidSerialization(IPCError):
pass
class ConnectionClosed(IPCError):
pass
def _read_objects(sock):
unpacker = msgpack.Unpacker()
ret = []
#reader = socket.socket.makefile(sock, 'rb')
while True:
try:
buf = sock.recv(1024**2)
if not buf:
break
unpacker.feed(buf)
for o in unpacker:
ret.append(o)
except:
break
return ret
#try:
# for unpack in unpacker:
# return unpack
#except Exception as e:
# print("[libmeliapi]: ", "_read_objects error ", e, file=sys.stderr,)
# return None
#finally:
# reader.flush()
def _write_objects(sock, objects):
sys.stderr.flush()
print("[libmeliapi]: ", "_write_objects ", objects, flush=True, file=sys.stderr, )
data = msgpack.packb(objects)
#print("[libmeliapi]: ", "_write_objects data ", data, flush=True, file=sys.stderr, )
sent = 0
while sent < len(data):
try:
_len = min(len(data[sent:]), 2048)
sent += sock.send(data[sent:sent+_len])
except IOError as e:
print("[libmeliapi]: IOError: ", e, e.errno, flush=True, file=sys.stderr, )
sys.stderr.flush()
if e.errno == errno.EWOULDBLOCK:
break
elif e.errno == errno.EAGAIN:
time.sleep(0.001)
continue
else:
raise
class Client(object):
def __init__(self, server_address):
self.buffer = deque()
self.addr = server_address
address_family = socket.AF_UNIX
self.sock = socket.socket(address_family, socket.SOCK_STREAM)
self.sock.setblocking(0)
def connect(self):
try:
self.sock.connect(self.addr)
print("[libmeliapi]: ", "self.send({ \"version\": \"dev\" }) = ",self.send({ "version": "dev" }), flush=True, file=sys.stderr)
self.expect_ack()
self._session = self.read()
self.ack()
print("[libmeliapi]: ", "self.buffer =", self.buffer, flush=True, file=sys.stderr, )
print("[libmeliapi]: ", "connected, session id is", self._session, flush=True, file=sys.stderr)
except socket.error as msg:
print("[libmeliapi]: ", msg, flush=True, file=sys.stderr, )
sys.stderr.flush()
sys.exit(1)
def close(self):
self.sock.close()
def setblocking(self, new_val):
self.sock.setblocking(new_val)
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def send(self, objects):
sys.stderr.flush()
#print("[libmeliapi]: ", "stuck in send ", self.buffer, flush=True, file=sys.stderr, )
_write_objects(self.sock, objects)
#print("[libmeliapi]: ", "unstuck wrote objs", flush=True, file=sys.stderr, )
#print("[libmeliapi]: ", "wrote object ", objects, file=sys.stderr)
time.sleep(0.001)
def ack(self):
sys.stderr.flush()
_write_objects(self.sock, 0x06)
time.sleep(0.001)
def expect_ack(self):
#print("[libmeliapi]: expect_ack, ", self.buffer, flush=True, file=sys.stderr, )
while True:
time.sleep(0.1)
read_list = _read_objects(self.sock)
self.buffer.extend(read_list)
try:
self.buffer.remove(0x6)
#print("[libmeliapi]: got_ack, ", self.buffer, flush=True, file=sys.stderr, )
return
except ValueError:
pass
def read(self):
sys.stderr.flush()
#print("[libmeliapi]: ", "stuck in read ", self.buffer, flush=True, file=sys.stderr, )
read_list = _read_objects(self.sock)
time.sleep(0.01)
self.buffer.extend(read_list)
#print("[libmeliapi]: ", "unstuck read self.buffer =", self.buffer, flush=True, file=sys.stderr, )
if len(self.buffer) > 0:
return self.buffer.popleft()
else:
return None
@property
def backend_fn_type(self):
return 0
@property
def backend_op_fn_type(self):
return 1
def ok_send(self, objects):
self.send({"t": "ok", "c": objects })
self.expect_ack()
def err_send(self, objects):
self.send({"t": "err", "c": objects })
self.expect_ack()

@ -1,119 +0,0 @@
#! /usr/bin/env python3
"""
meli - sample plugin
Copyright 2019 Manos Pitsidianakis
This file is part of meli.
meli is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
meli is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with meli. If not, see <http://www.gnu.org/licenses/>.
"""
import sys
import time
import subprocess
import msgpack
import nntplib
import libmeliapi
import itertools
def chunks(iterable, n):
while True:
try:
yield itertools.chain((next(iterable),), itertools.islice(iterable, n-1))
except:
break
class NNTPClient(libmeliapi.Client):
def __init__(self, stream_address, server_address, newsgroup):
super().__init__(stream_address)
self.bytes_cache = {}
self.conn = nntplib.NNTP(server_address)
self.newsgroup = newsgroup
def backend_req(self, req):
print("[nntp-plugin]: backend_req = ", req, flush=True, file=sys.stderr)
if req.data == b'is_online':
self.ok_send(None)
elif req.data == b'get':
resp, count, first, last, name = self.conn.group(self.newsgroup)
print('Group', name, 'has', count, 'articles, range', first, 'to', last, flush=True, file=sys.stderr)
resp, overviews = self.conn.over((0, last))
for chunk in chunks(iter(reversed(overviews)), 100):
ret = []
for id, over in chunk:
#print(id, nntplib.decode_header(over['subject']), flush=True, file=sys.stderr)
env = {}
env["hash"] = id
env["subject"] = nntplib.decode_header(over["subject"])
env["from"] = nntplib.decode_header(over["from"])
env["date"] = nntplib.decode_header(over["date"])
env["message_id"] = nntplib.decode_header(over["message-id"])
env["references"] = nntplib.decode_header(over["references"])
try:
env["to"] = nntplib.decode_header(over["to"])
except KeyError:
env["to"] = self.newsgroup
ret.append(env)
print("ret len = ", len(ret), flush=True,file=sys.stderr)
self.ok_send(ret)
self.ok_send(None)
def backend_op_req(self, req):
print("[nntp-plugin]: backend_op_req = ", req, flush=True, file=sys.stderr)
if req.data == b'as_bytes':
_hash = self.read()
print("[nntp-plugin]: hash = ", _hash, flush=True, file=sys.stderr)
self.ack()
try:
try:
self.ok_send(self.bytes_cache[_hash])
except KeyError:
resp, info = self.conn.article(_hash)
#print(_id, " line0 = ", str(info.lines[0], 'utf-8', 'ignore'))
elem = b'\n'.join(info.lines)
self.bytes_cache[_hash] = str(elem, 'utf-8', 'ignore')
self.ok_send(self.bytes_cache[_hash])
except Exception as e:
self.err_send(str(e))
if __name__ == "__main__":
import importlib
importlib.reload(libmeliapi)
stream_address = './soworkfile'
server_address = 'news.gmane.org'
newsgroup = 'gmane.comp.python.committers'
client = NNTPClient(stream_address, server_address, newsgroup)
client.connect()
#client.setblocking(True)
try:
while True:
req = client.read()
if req is None:
time.sleep(0.15)
continue
#client.setblocking(True)
client.ack()
print("[nntp-plugin]: ", "req: ", req, flush=True, file=sys.stderr)
sys.stderr.flush()
if isinstance(req, msgpack.ExtType):
if req.code == client.backend_fn_type:
client.backend_req(req)
elif req.code == client.backend_op_fn_type:
client.backend_op_req(req)
print("[nntp-plugin]: ", req, flush=True, file=sys.stderr)
#client.setblocking(True)
time.sleep(0.15)
except:
raise RuntimeError("Something bad happened")

@ -1,142 +0,0 @@
/*
* meli - plugins
*
* Copyright 2019 Manos Pitsidianakis
*
* This file is part of meli.
*
* meli is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* meli is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
use super::*;
#[derive(Debug)]
pub struct RpcChannel {
stream: UnixStream,
session: Uuid,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub struct PluginGreeting {
version: String,
}
impl RpcChannel {
pub fn new(stream: UnixStream, session: &Uuid) -> Result<RpcChannel> {
let mut ret = RpcChannel {
stream,
session: *session,
};
let greeting: PluginGreeting = ret
.from_read()
.map_err(|err| Error::new(format!("Could not get correct plugin greeting: {}", err)))?;
debug!(&greeting);
//if greeting.version != "dev" {
// return Err("Plugin is not compatible with our API (dev)".into());
//}
ret.write_ref(&rmpv::ValueRef::String(session.to_string().as_str().into()))?;
debug!(ret.expect_ack())?;
Ok(ret)
}
pub fn expect_ack(&mut self) -> Result<()> {
debug!("expect_ack()");
let ack: u32 = debug!(rmp_serde::decode::from_read(&mut self.stream))
.map_err(|_| Error::new("Plugin did not return ACK."))?;
if 0x6 == ack {
Ok(())
} else {
Err(Error::new("Plugin did not return ACK."))
}
}
pub fn ack(&mut self) -> Result<()> {
debug!("ack()");
debug!(rmpv::encode::write_value_ref(
&mut self.stream,
&rmpv::ValueRef::Integer(0x6.into())
))
.map_err(|err| Error::new(err.to_string()))?;
let _ = self.stream.flush();
Ok(())
}
pub fn write_ref(&mut self, value_ref: &rmpv::ValueRef) -> Result<()> {
debug!("write_ref() {:?}", value_ref);
debug!(rmpv::encode::write_value_ref(&mut self.stream, value_ref))
.map_err(|err| Error::new(err.to_string()))?;
let _ = self.stream.flush();
Ok(())
}
pub fn read(&mut self) -> Result<rmpv::Value> {
debug!("read()");
let ret: RpcResult = debug!(rmp_serde::decode::from_read(&mut self.stream))
.map_err(|err| Error::new(err.to_string()))?;
let _ = self.stream.flush();
self.ack()?;
debug!("read() ret={:?}", &ret);
ret.into()
}
pub fn from_read<T>(&mut self) -> Result<T>
where
T: std::fmt::Debug + serde::de::DeserializeOwned,
{
debug!("from_read()");
let ret: Result<T> = debug!(rmp_serde::decode::from_read(&mut self.stream))
.map_err(|err| Error::new(err.to_string()));
let _ = self.stream.flush();
self.ack()?;
debug!("read() ret={:?}", &ret);
ret
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[serde(tag = "t", content = "c")]
enum RpcResult {
Ok(rmpv::Value),
Err(String),
}
impl RpcResult {
fn into(self) -> Result<rmpv::Value> {
match self {
RpcResult::Ok(v) => Ok(v),
RpcResult::Err(err) => Err(Error::new(err)),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[serde(tag = "t", content = "c")]
pub enum PluginResult<T: std::fmt::Debug + Clone> {
Ok(T),
Err(String),
}
impl<T: std::fmt::Debug + Clone + serde::Serialize + serde::de::DeserializeOwned> Into<Result<T>>
for PluginResult<T>
{
fn into(self) -> Result<T> {
match self {
PluginResult::Ok(v) => Ok(v),
PluginResult::Err(err) => Err(Error::new(err)),
}
}
}

@ -343,10 +343,8 @@ impl State {
sender: Sender<ThreadEvent>,
receiver: Receiver<ThreadEvent>,
) -> Result<Self> {
/*
* Create async channel to block the input-thread if we need to fork and stop
* it from reading stdin, see get_events() for details
*/
// Create async channel to block the input-thread if we need to fork and stop it
// from reading stdin, see get_events() for details
let input_thread = unbounded();
let input_thread_pipe = crate::types::pipe()?;
let backends = Backends::new();
@ -355,20 +353,6 @@ impl State {
} else {
Settings::new()?
});
/*
let mut plugin_manager = PluginManager::new();
for (_, p) in settings.plugins.clone() {
if crate::plugins::PluginKind::Backend == p.kind() {
debug!("registering {:?}", &p);
crate::plugins::backend::PluginBackend::register(
plugin_manager.listener(),
p.clone(),
&mut backends,
);
}
plugin_manager.register(p)?;
}
*/
let (cols, rows) = termion::terminal_size().chain_err_summary(|| {
"Could not determine terminal size. Are you running this on a tty? If yes, do you need \

Loading…
Cancel
Save