diff --git a/src/service/notification.rs b/src/service/notification.rs index 277c06260ee079174b8ecd2a92c74446c8b91094..21563d703a3c29ae7f388187715307a0818b5966 100644 --- a/src/service/notification.rs +++ b/src/service/notification.rs @@ -18,42 +18,85 @@ impl NotificationService { return thread::spawn(move || Self::subscribe_request(product_type_clone)) .join().unwrap(); } -} -#[tokio::main] -async fn subscribe_request(product_type: String) -> Result<SubscriberRequest> { - let product_type_upper: String = product_type.to_uppercase(); - let product_type_str: &str = product_type_upper.as_str(); - let notification_receiver_url: String = format!("{}/receive", - APP_CONFIG.get_instance_root_url()); - let payload: SubscriberRequest = SubscriberRequest { - name: APP_CONFIG.get_instance_name().to_string(), - url: notification_receiver_url, - }; - - let request_url: String = format!("{}/notification/subscribe/{}", - APP_CONFIG.get_publisher_root_url(), product_type_str); - let request = REQWEST_CLIENT - .post(request_url.clone()) - .header("content-type", "application/json") - .header("Accept", "application/json") - .body(to_string(&payload).unwrap()) - .send() - .await; - - log::warn_!("Sent subscribe request to: {}", request_url); - - return match request { - Ok(f) => match f.json::<SubscriberRequest>().await { - Ok(x) => Ok(x), - Err(_) => Err(compose_error_response( - Status::NotAcceptable, - "Failed to parse response".to_string(), + #[tokio::main] + async fn subscribe_request(product_type: String) -> Result<SubscriberRequest> { + let product_type_upper: String = product_type.to_uppercase(); + let product_type_str: &str = product_type_upper.as_str(); + let notification_receiver_url: String = format!("{}/receive", + APP_CONFIG.get_instance_root_url()); + let payload: SubscriberRequest = SubscriberRequest { + name: APP_CONFIG.get_instance_name().to_string(), + url: notification_receiver_url, + }; + + let request_url: String = format!("{}/notification/subscribe/{}", + APP_CONFIG.get_publisher_root_url(), product_type_str); + let request = REQWEST_CLIENT + .post(request_url.clone()) + .header("content-type", "application/json") + .header("Accept", "application/json") + .body(to_string(&payload).unwrap()) + .send() + .await; + + log::warn_!("Sent subscribe request to: {}", request_url); + + return match request { + Ok(f) => match f.json::<SubscriberRequest>().await { + Ok(x) => Ok(x), + Err(_) => Err(compose_error_response( + Status::NotAcceptable, + "Failed to parse response".to_string(), + )) + }, + Err(e) => Err(compose_error_response( + Status::NotFound, + e.to_string(), + )), + }; + } + + pub fn unsubscribe(product_type: &str) -> Result<SubscriberRequest> { + let product_type_clone = String::from(product_type); + return thread::spawn(move || Self::unsubscribe_request(product_type_clone)) + .join().unwrap(); + } + + #[tokio::main] + async fn unsubscribe_request(product_type: String) -> Result<SubscriberRequest> { + let product_type_upper: String = product_type.to_uppercase(); + let product_type_str: &str = product_type_upper.as_str(); + let notification_receiver_url: String = format!("{}/receive", + APP_CONFIG.get_instance_root_url()); + let request_url: String = format!("{}/notification/unsubscribe/{}?url={}", + APP_CONFIG.get_publisher_root_url(), + product_type_str, + notification_receiver_url); + + let request = REQWEST_CLIENT + .post(request_url.clone()) + .header("content-type", "application/json") + .header("Accept", "application/json") + .send() + .await; + log::warn_!("Sent unsubscribe request to: {}", request_url); + + return match request { + Ok(f) => { + match f.json::<SubscriberRequest>().await { + Ok(x) => Ok(x), + Err(_) => Err(compose_error_response( + Status::NotFound, + String::from("Already unsubscribed to the topic.") + )) + } + } + Err(e) => Err(compose_error_response( + Status::NotFound, + e.to_string() )) - }, - Err(e) => Err(compose_error_response( - Status::NotFound, - e.to_string(), - )), - }; -} \ No newline at end of file + }; + } +} +