Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
fix(parity-clib): grumbles that were not addressed in #9920 (#10154)
Browse files Browse the repository at this point in the history
* fix(remove needless unsafe blocks)

* style(nits)

* fix(parity-clib): eliminate repetitive event loops

* revert(java bindings): safe rust -> unsafe rust

These functions can still end up with `UB` thus should be unsafe

* fix(grumbles): make Callback trait `pub (crate)`
  • Loading branch information
niklasad1 authored and 5chdn committed Feb 11, 2019
1 parent 751d15e commit c84e574
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 163 deletions.
102 changes: 26 additions & 76 deletions parity-clib/src/java.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,31 @@
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.

use std::{mem, ptr};
use std::ffi::c_void;
use std::sync::Arc;
use std::time::Duration;
use std::thread;
use std::os::raw::c_void;

use {parity_config_from_cli, parity_destroy, parity_set_logger, parity_start, parity_unsubscribe_ws, ParityParams, error};
use {Callback, parity_config_from_cli, parity_destroy, parity_rpc_worker, parity_start, parity_set_logger,
parity_unsubscribe_ws, parity_ws_worker, ParityParams};

use futures::{Future, Stream};
use futures::sync::mpsc;
use jni::{JavaVM, JNIEnv};
use jni::objects::{JClass, JString, JObject, JValue, GlobalRef};
use jni::sys::{jlong, jobjectArray, va_list};
use tokio_current_thread::CurrentThread;
use parity_ethereum::{RunningClient, PubSubSession};
use parity_ethereum::RunningClient;

type CheckedQuery<'a> = (&'a RunningClient, String, JavaVM, GlobalRef);

// Creates a Java callback to a static method named `void callback(Object)`
struct Callback<'a> {
struct JavaCallback<'a> {
jvm: JavaVM,
callback: GlobalRef,
method_name: &'a str,
method_descriptor: &'a str,
}

unsafe impl<'a> Send for Callback<'a> {}
unsafe impl<'a> Sync for Callback<'a> {}
impl<'a> Callback<'a> {
unsafe impl<'a> Send for JavaCallback<'a> {}
unsafe impl<'a> Sync for JavaCallback<'a> {}

impl<'a> JavaCallback<'a> {
fn new(jvm: JavaVM, callback: GlobalRef) -> Self {
Self {
jvm,
Expand All @@ -51,7 +48,9 @@ impl<'a> Callback<'a> {
method_descriptor: "(Ljava/lang/Object;)V",
}
}
}

impl<'a> Callback for JavaCallback<'a> {
fn call(&self, msg: &str) {
let env = self.jvm.attach_current_thread().expect("JavaVM should have an environment; qed");
let java_str = env.new_string(msg.to_string()).expect("Rust String is valid JString; qed");
Expand All @@ -63,21 +62,21 @@ impl<'a> Callback<'a> {

#[no_mangle]
pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_configFromCli(env: JNIEnv, _: JClass, cli: jobjectArray) -> jlong {
let cli_len = env.get_array_length(cli).expect("invalid Java bindings");
let cli_len = env.get_array_length(cli).expect("invalid Java bindings") as usize;

let mut jni_strings = Vec::with_capacity(cli_len as usize);
let mut opts = Vec::with_capacity(cli_len as usize);
let mut opts_lens = Vec::with_capacity(cli_len as usize);
let mut jni_strings = Vec::with_capacity(cli_len);
let mut opts = Vec::with_capacity(cli_len);
let mut opts_lens = Vec::with_capacity(cli_len);

for n in 0..cli_len {
for n in 0..cli_len as i32 {
let elem = env.get_object_array_element(cli, n).expect("invalid Java bindings");
let elem_str: JString = elem.into();
match env.get_string(elem_str) {
Ok(s) => {
opts.push(s.as_ptr());
opts_lens.push(s.to_bytes().len());
jni_strings.push(s);
},
}
Err(err) => {
let _ = env.throw_new("java/lang/Exception", err.to_string());
return 0
Expand All @@ -86,7 +85,7 @@ pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_configFromCli(env:
}

let mut out = ptr::null_mut();
match parity_config_from_cli(opts.as_ptr(), opts_lens.as_ptr(), cli_len as usize, &mut out) {
match parity_config_from_cli(opts.as_ptr(), opts_lens.as_ptr(), cli_len, &mut out) {
0 => out as jlong,
_ => {
let _ = env.throw_new("java/lang/Exception", "failed to create config object");
Expand Down Expand Up @@ -120,7 +119,7 @@ pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_build(
_ => {
let _ = env.throw_new("java/lang/Exception", "failed to start Parity");
0
},
}
}
}

Expand All @@ -129,7 +128,7 @@ pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_destroy(_env: JNIEn
parity_destroy(parity);
}

unsafe fn async_checker<'a>(client: va_list, rpc: JString, callback: JObject, env: &JNIEnv<'a>)
unsafe fn java_query_checker<'a>(client: va_list, rpc: JString, callback: JObject, env: &JNIEnv<'a>)
-> Result<CheckedQuery<'a>, String> {
let query: String = env.get_string(rpc)
.map(Into::into)
Expand All @@ -151,26 +150,10 @@ pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_rpcQueryNative(
callback: JObject,
)
{
let _ = async_checker(parity, rpc, callback, &env)
let _ = java_query_checker(parity, rpc, callback, &env)
.map(|(client, query, jvm, global_ref)| {
let callback = Arc::new(Callback::new(jvm, global_ref));
let cb = callback.clone();
let future = client.rpc_query(&query, None).map(move |response| {
let response = response.unwrap_or_else(|| error::EMPTY.to_string());
callback.call(&response);
});

let _handle = thread::Builder::new()
.name("rpc_query".to_string())
.spawn(move || {
let mut current_thread = CurrentThread::new();
current_thread.spawn(future);
let _ = current_thread.run_timeout(Duration::from_millis(timeout_ms as u64))
.map_err(|_e| {
cb.call(error::TIMEOUT);
});
})
.expect("rpc-query thread shouldn't fail; qed");
let callback = Arc::new(JavaCallback::new(jvm, global_ref));
parity_rpc_worker(client, &query, callback, timeout_ms as u64);
})
.map_err(|e| {
let _ = env.throw_new("java/lang/Exception", e);
Expand All @@ -186,43 +169,10 @@ pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_subscribeWebSocketN
callback: JObject,
) -> va_list {

async_checker(parity, rpc, callback, &env)
java_query_checker(parity, rpc, callback, &env)
.map(move |(client, query, jvm, global_ref)| {
let callback = Arc::new(Callback::new(jvm, global_ref));
let (tx, mut rx) = mpsc::channel(1);
let session = Arc::new(PubSubSession::new(tx));
let weak_session = Arc::downgrade(&session);
let query_future = client.rpc_query(&query, Some(session.clone()));;

let _handle = thread::Builder::new()
.name("ws-subscriber".into())
.spawn(move || {
// Wait for subscription ID
// Note this may block forever and can't be destroyed using the session object
// However, this will likely timeout or be catched the RPC layer
if let Ok(Some(response)) = query_future.wait() {
callback.call(&response);
} else {
callback.call(error::SUBSCRIBE);
return;
};

loop {
for response in rx.by_ref().wait() {
if let Ok(r) = response {
callback.call(&r);
}
}

let rc = weak_session.upgrade().map_or(0,|session| Arc::strong_count(&session));
// No subscription left, then terminate
if rc <= 1 {
break;
}
}
})
.expect("rpc-subscriber thread shouldn't fail; qed");
Arc::into_raw(session) as va_list
let callback = Arc::new(JavaCallback::new(jvm, global_ref));
parity_ws_worker(client, &query, callback) as va_list
})
.unwrap_or_else(|e| {
let _ = env.throw_new("java/lang/Exception", e);
Expand Down
Loading

0 comments on commit c84e574

Please sign in to comment.