@ -11,12 +11,13 @@ use std::sync::{
} ;
use std ::{ collections ::HashSet , convert ::TryInto } ;
#[ allow(unused) ]
use crossbeam_channel ::{ unbounded , Receiver , Sender } ;
use eyre ::Result ;
use serde_json ::Value ;
use crate ::database ::{
database_like ::Database Like ,
database_like ::Database Query ,
query ::Query ,
query_result ::{ QueryResult , QueryRow } ,
} ;
@ -35,8 +36,14 @@ pub enum Response<Context: Send + 'static> {
pub ( super ) type InputSender < Context > = Sender < ( Query , Context ) > ;
pub ( super ) type OutputReciever < Context > = Receiver < Result < Response < Context > > > ;
// FIXME: Instead of this wasm mess, two different link types?
pub ( super ) struct Link < Context : Send + ' static > {
database : Box < dyn DatabaseQuery > ,
#[ cfg(not(target_arch = " wasm32 " )) ]
pub input_sender : InputSender < Context > ,
#[ cfg(not(target_arch = " wasm32 " )) ]
pub output_receiver : OutputReciever < Context > ,
// We need to account for the brief moment where the processing channel is empty
// but we're applying the results. If there is a UI update in this window,
@ -46,8 +53,47 @@ pub(super) struct Link<Context: Send + 'static> {
// put into the output channel. In order to account for all of this, we employ a
// request counter to know how many requests are currently in the pipeline
request_counter : Arc < AtomicUsize > ,
#[ cfg(target_arch = " wasm32 " ) ]
response : Vec < Response < Context > > ,
}
#[ cfg(target_arch = " wasm32 " ) ]
impl < Context : Send + Sync + ' static > Link < Context > {
pub fn request ( & mut self , query : & Query , context : Context ) -> Result < ( ) > {
let result = self . database . query ( & query ) ? ;
if let Some ( response ) = process_query ( query . clone ( ) , result , context ) . ok ( ) {
self . response . insert ( 0 , response ) ;
}
Ok ( ( ) )
}
pub fn receive ( & mut self ) -> Result < Option < Response < Context > > > {
Ok ( self . response . pop ( ) )
}
pub fn is_processing ( & self ) -> bool {
self . request_counter . load ( Ordering ::Relaxed ) > 0
}
pub fn request_counter ( & self ) -> Arc < AtomicUsize > {
self . request_counter . clone ( )
}
}
#[ cfg(target_arch = " wasm32 " ) ]
pub ( super ) fn run < Context : Send + Sync + ' static , Database : DatabaseQuery > (
config : & Config ,
database : Database ,
) -> Result < Link < Context > > {
Ok ( Link {
database : Box ::new ( database ) ,
request_counter : Arc ::new ( AtomicUsize ::new ( 0 ) ) ,
response : Vec ::new ( ) ,
} )
}
#[ cfg(not(target_arch = " wasm32 " )) ]
impl < Context : Send + Sync + ' static > Link < Context > {
pub fn request ( & mut self , query : & Query , context : Context ) -> Result < ( ) > {
self . request_counter . fetch_add ( 1 , Ordering ::Relaxed ) ;
@ -81,7 +127,8 @@ impl<Context: Send + Sync + 'static> Link<Context> {
}
}
pub ( super ) fn run < Context : Send + Sync + ' static , Database : DatabaseLike + ' static > (
#[ cfg(not(target_arch = " wasm32 " )) ]
pub ( super ) fn run < Context : Send + Sync + ' static , Database : DatabaseQuery > (
config : & Config ,
) -> Result < Link < Context > > {
// Create a new database connection, just for reading
@ -96,7 +143,8 @@ pub(super) fn run<Context: Send + Sync + 'static, Database: DatabaseLike + 'stat
} )
}
fn inner_loop < Context : Send + Sync + ' static , Database : DatabaseLike > (
#[ cfg(not(target_arch = " wasm32 " )) ]
fn inner_loop < Context : Send + Sync + ' static , Database : DatabaseQuery > (
database : Database ,
input_receiver : Receiver < ( Query , Context ) > ,
output_sender : Sender < Result < Response < Context > > > ,
@ -104,39 +152,54 @@ fn inner_loop<Context: Send + Sync + 'static, Database: DatabaseLike>(
loop {
let ( query , context ) = input_receiver . recv ( ) ? ;
let result = database . query ( & query ) ? ;
let response = match query {
Query ::Grouped { .. } = > {
let segmentations = calculate_segmentations ( & result ) ? ;
Response ::Grouped ( query , context , segmentations )
}
Query ::Normal { .. } = > {
let converted = calculate_rows ( & result ) ? ;
Response ::Normal ( query , context , converted )
}
Query ::Other { .. } = > {
let mut results = HashSet ::new ( ) ;
for entry in result {
match entry {
QueryResult ::Other ( field ) = > match field . value ( ) {
Value ::Array ( s ) = > {
for n in s {
if let Value ::String ( s ) = n {
if ! results . contains ( s ) {
results . insert ( s . to_owned ( ) ) ;
}
let response = process_query ( query , result , context ) ;
output_sender . send ( response ) ? ;
}
}
fn process_query < Context : Send + Sync + ' static > (
query : Query ,
result : Vec < QueryResult > ,
context : Context ,
) -> Result < Response < Context > > {
let response = match query {
Query ::Grouped { .. } = > {
let segmentations = calculate_segmentations ( & result ) ? ;
Response ::Grouped ( query , context , segmentations )
}
Query ::Normal { .. } = > {
let converted = calculate_rows ( & result ) ? ;
Response ::Normal ( query , context , converted )
}
Query ::Other { .. } = > {
let mut results = HashSet ::new ( ) ;
for entry in result {
match entry {
QueryResult ::Other ( field ) = > match field . value ( ) {
Value ::Array ( s ) = > {
for n in s {
if let Value ::String ( s ) = n {
if ! results . contains ( s ) {
results . insert ( s . to_owned ( ) ) ;
}
}
}
_ = > panic! ( "Should not end up here" ) ,
} ,
_ = > panic! ( "Should not end up here" ) ,
}
_ = > {
#[ cfg(debug_assertions) ]
panic! ( "Should not end up here" )
}
} ,
_ = > {
#[ cfg(debug_assertions) ]
panic! ( "Should not end up here" )
}
}
Response ::Other ( query , context , results . into_iter ( ) . collect ( ) )
}
} ;
output_sender . send ( Ok ( response ) ) ? ;
}
Response ::Other ( query , context , results . into_iter ( ) . collect ( ) )
}
} ;
Ok ( response )
}
fn calculate_segmentations ( result : & [ QueryResult ] ) -> Result < Segmentation > {