diff --git a/ory-actions/vpncheck-py/README.md b/ory-actions/vpncheck-py/README.md index cf28292..3ad0172 100644 --- a/ory-actions/vpncheck-py/README.md +++ b/ory-actions/vpncheck-py/README.md @@ -1,11 +1,11 @@ -# Ory Action to check IP addresses against vpnapi.io +# Ory Action to check IP addresses against VPN and Fraud detection services This is an example Action (webhook) to check client IP addresses against -vpnapi.com and block requests +security services like focsec.com, vpnapi.com and ipqs.com and block requests - coming from TOR clients - coming from known VPNs -- coming from certain geographies (in this example: RU) +- coming from certain geographies It's intended for use as a post-login Action on Ory Network and returns a message that can be parsed by Ory and displayed to the user. @@ -19,7 +19,7 @@ Cloud Functions, and can be adapted for different scenarios. - A Google Cloud project with Cloud Functions active (or an alternate way to deploy) -- A vpnapi.com account +- An account with focsec.com, ipqs.com or vpnapi.com - python 3.9+ with flask, requests, google cloud logging To install dependencies, run e.g. @@ -33,15 +33,17 @@ pip3 install google-cloud-logging ```bash export BEARER_TOKEN=SOME_SECRET_API_KEY_FOR_YOUR_WEBHOOK; +# Set the API Key for the service you use export VPNAPIIO_API_KEY=YOUR_VPNAPI_KEY; -python3 main.py +export FOCSEC_API_KEY=YOUR_FOCSEC_KEY; +export IPQS_API_KEY=YOUR_IPQS_KEY; ``` ### Run locally ```bash cd ory-actions/vpncheck-py -python3 main.py +python3 focsec.py # or vpnapi.py or ipqs.py ``` #### Send a sample request @@ -69,7 +71,8 @@ After setting up your GCP project (see, for example, you can deploy the Action as a cloud function: ```bash -gcloud functions deploy vpncheck --runtime python39 --trigger-http --allow-unauthenticated --set-env-vars BEARER_TOKEN=$SOME_SECRET_API_KEY_FOR_YOUR_WEBHOOK,VPNAPIIO_API_KEY=$VPNAPIIO_API_KEY,ENABLE_CLOUD_LOGGING=true --source=. +cp focsec.py main.py # Cloud functions like a main.py, so copy the implementation you're adopting there +gcloud functions deploy vpncheck --runtime python39 --trigger-http --allow-unauthenticated --set-env-vars BEARER_TOKEN=$SOME_SECRET_API_KEY_FOR_YOUR_WEBHOOK,VPNAPIIO_API_KEY=$VPNAPIIO_API_KEY,IPQS_API_KEY=$IPQS_API_KEY,ENABLE_CLOUD_LOGGING=true --source=. ``` Note: You may need to create a `venv` for dependencies to load correctly. diff --git a/ory-actions/vpncheck-py/focsec.py b/ory-actions/vpncheck-py/focsec.py new file mode 100644 index 0000000..744b66d --- /dev/null +++ b/ory-actions/vpncheck-py/focsec.py @@ -0,0 +1,96 @@ +# Copyright © 2023 Ory Corp +# SPDX-License-Identifier: Apache-2.0 + +from flask import Flask, request, jsonify +import requests + +import os + +# load google cloud logging if running on GCP +if os.getenv('ENABLE_CLOUD_LOGGING', ''): + # set up the Google Cloud Logging python client library + import google.cloud.logging + client = google.cloud.logging.Client() + client.setup_logging() + +# use Python’s standard logging library to send logs to GCP +import logging + +app = Flask(__name__) + +# Define the bearer token for authentication +BEARER_TOKEN = os.environ.get("BEARER_TOKEN") +FOCSEC_API_KEY = os.environ.get("FOCSEC_API_KEY") + +if not BEARER_TOKEN or not FOCSEC_API_KEY: + raise ValueError("BEARER_TOKEN or FOCSEC_API_KEY not set in environment variables.") + +@app.route("/vpncheck", methods=["POST"]) +def handle_vpncheck(): + return vpncheck(request) + +def vpncheck(request): + # Check for bearer token authentication + auth_header = request.headers.get("Authorization") + if not auth_header or not auth_header.startswith("Bearer "): + return jsonify({"error": "Unauthorized"}), 401 + + provided_token = auth_header.split("Bearer ")[1] + if provided_token != BEARER_TOKEN: + return jsonify({"error": "Unauthorized"}), 401 + + # Parse the JSON payload and extract the IP address + data = request.get_json() + logging.info(f"request: {data}") + ip_address = data.get("ip_address") + if not ip_address: + return error_response("Cannot determine Client IP address") + + # Call vpnapi.io to check the IP address + # if the API fails, we permit by default + try: + vpn_result = query_focsec(ip_address) + except Exception as e: + return jsonify({"warning": "Unable to check VPN: ", "details": str(e)}), 200 + + # Check the response from focsec + if "is_vpn" in vpn_result and vpn_result["is_vpn"] == True: + logging.info(f"Blocked: VPN") + return error_response("Request blocked: VPN") + if "is_tor" in vpn_result and vpn_result["is_tor"] == True: + logging.info(f"Blocked: Tor") + return error_response("Request blocked: Tor") + + if ( + "iso_code" in vpn_result + and vpn_result["iso_code"] == "ru" + ): + logging.info(f"geoblock: {vpn_result['iso_code']}") + return error_response("Request blocked: Geolocation") + + # Return the result as success or error details + return jsonify(vpn_result), 200 + + +def error_response(msg): + return jsonify({"messages": [{ "messages": [{ "text": msg }] }]}), 400 + + +def query_focsec(ip_address): + # Implement the logic to call focsec.com and retrieve the result + # You can use libraries like requests or httpx for making HTTP requests + # Return the response as a dictionary + # For example: + + url = f"https://api.focsec.com/v1/ip/{ip_address}?api_key={FOCSEC_API_KEY}" + response = requests.get(url, timeout=1.5) + if response.status_code != 200: + raise Exception(f"vpnapi.io returned {response.status_code}") + + result = response.json() + + return result + + +if __name__ == "__main__": + app.run() diff --git a/ory-actions/vpncheck-py/ipqs.py b/ory-actions/vpncheck-py/ipqs.py new file mode 100644 index 0000000..c5ad96b --- /dev/null +++ b/ory-actions/vpncheck-py/ipqs.py @@ -0,0 +1,115 @@ +# Copyright © 2023 Ory Corp +# SPDX-License-Identifier: Apache-2.0 + +from flask import Flask, request, jsonify +import requests + +import os + +# load google cloud logging if running on GCP +if os.getenv('ENABLE_CLOUD_LOGGING', ''): + # set up the Google Cloud Logging python client library + import google.cloud.logging + client = google.cloud.logging.Client() + client.setup_logging() + +# use Python’s standard logging library to send logs to GCP +import logging + +app = Flask(__name__) + +# Define the bearer token for authentication +BEARER_TOKEN = os.environ.get("BEARER_TOKEN") +IPQS_API_KEY = os.environ.get("IPQS_API_KEY") + +if not BEARER_TOKEN or not IPQS_API_KEY: + raise ValueError("BEARER_TOKEN or IPQS_API_KEY not set in environment variables.") + +@app.route("/vpncheck", methods=["POST"]) +def handle_vpncheck(): + return vpncheck(request) + +def vpncheck(request): + # Check for bearer token authentication + auth_header = request.headers.get("Authorization") + if not auth_header or not auth_header.startswith("Bearer "): + return jsonify({"error": "Unauthorized"}), 401 + + provided_token = auth_header.split("Bearer ")[1] + if provided_token != BEARER_TOKEN: + return jsonify({"error": "Unauthorized"}), 401 + + # Parse the JSON payload and extract the IP address + data = request.get_json() + logging.info(f"request: {data}") + ip_address = data.get("ip_address") + if not ip_address: + return error_response("Cannot determine Client IP address") + + # Call vpnapi.io to check the IP address + # if the API fails, we permit by default + try: + vpn_result = query_ipqs(ip_address) + except Exception as e: + return jsonify({"warning": "Unable to check VPN: ", "details": str(e)}), 200 + + # Check the response from IPQS - based on example from https://www.ipqualityscore.com/documentation/proxy-detection/overview + + if not 'success' in vpn_result or vpn_result['success'] == False: + logging.info(f"IPQS failed, result: {vpn_result}") + return error_response("Can't verify IP address") + + if "fraud_score" in vpn_result and vpn_result["fraud_score"] >= 80: + logging.info(f"Blocked: Fraud score {vpn_result['fraud_score']}") + return error_response("Authentication failed: IP address cannot be verified") + if "vpn" in vpn_result and vpn_result["vpn"] == True: + logging.info(f"Blocked: VPN") + return error_response("Authentication failed: Please disable your VPN.") + if "tor" in vpn_result and vpn_result["tor"] == True: + logging.info(f"Blocked: Tor") + return error_response("Authentication failed: Please disable Tor.") + + if ( + "iso_code" in vpn_result + and vpn_result["iso_code"] == "ru" + ): + logging.info(f"geoblock: {vpn_result['iso_code']}") + return error_response("Request blocked: Geolocation") + + # Return the result as success or error details + return jsonify(vpn_result), 200 + +def error_response(msg): + return jsonify({"messages": [{ "messages": [{ "text": msg }] }]}), 400 + + +def query_ipqs(ip_address): + # Implement the logic to call ipqs.com and retrieve the result + # You can use libraries like requests or httpx for making HTTP requests + # Return the response as a dictionary + # For example: + + ipqs_parameters = { + #'user_agent' : header_items['User-Agent'], + #'user_language' : header_items['Accept-Language'].split(',')[0], + 'strictness' : 1, + # You may want to allow public access points like coffee shops, schools, corporations, etc... + 'allow_public_access_points' : 'true', + # Reduce scoring penalties for mixed quality IP addresses shared by good and bad users. + 'lighter_penalties' : 'false' + } + + + url = f"https://www.ipqualityscore.com/api/json/ip/{IPQS_API_KEY}/{ip_address}" + response = requests.get(url, params = ipqs_parameters, timeout = 1.5) + + + if response.status_code != 200: + raise Exception(f"IPQS returned {response.status_code}") + + result = response.json() + return result + + +if __name__ == "__main__": + app.run() diff --git a/ory-actions/vpncheck-py/main.py b/ory-actions/vpncheck-py/vpnapi.py similarity index 100% rename from ory-actions/vpncheck-py/main.py rename to ory-actions/vpncheck-py/vpnapi.py