Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IPQS & Focsec webhook #81

Merged
merged 9 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions ory-actions/vpncheck-py/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# Ory Action to check IP addresses against vpnapi.io
# Ory Action to check IP addresses against VPN and Fraud detection services

This is an example Action (webhook) to check client IP addresses against
vpnapi.com and block requests
security services like focsec.com, vpnapi.com and ipqs.com and block requests

- coming from TOR clients
- coming from known VPNs
- coming from certain geographies (in this example: RU)
- coming from certain geographies

It's intended for use as a post-login Action on Ory Network and returns a
message that can be parsed by Ory and displayed to the user.
Expand All @@ -19,7 +19,7 @@ Cloud Functions, and can be adapted for different scenarios.

- A Google Cloud project with Cloud Functions active (or an alternate way to
deploy)
- A vpnapi.com account
- An account with focsec.com, ipqs.com or vpnapi.com
- python 3.9+ with flask, requests, google cloud logging

To install dependencies, run e.g.
Expand All @@ -33,15 +33,17 @@ pip3 install google-cloud-logging

```bash
export BEARER_TOKEN=SOME_SECRET_API_KEY_FOR_YOUR_WEBHOOK;
# Set the API Key for the service you use
export VPNAPIIO_API_KEY=YOUR_VPNAPI_KEY;
python3 main.py
export FOCSEC_API_KEY=YOUR_FOCSEC_KEY;
export IPQS_API_KEY=YOUR_IPQS_KEY;
```

### Run locally

```bash
cd ory-actions/vpncheck-py
python3 main.py
python3 focsec.py # or vpnapi.py or ipqs.py
```

#### Send a sample request
Expand Down Expand Up @@ -69,7 +71,8 @@ After setting up your GCP project (see, for example,
you can deploy the Action as a cloud function:

```bash
gcloud functions deploy vpncheck --runtime python39 --trigger-http --allow-unauthenticated --set-env-vars BEARER_TOKEN=$SOME_SECRET_API_KEY_FOR_YOUR_WEBHOOK,VPNAPIIO_API_KEY=$VPNAPIIO_API_KEY,ENABLE_CLOUD_LOGGING=true --source=.
cp focsec.py main.py # Cloud functions like a main.py, so copy the implementation you're adopting there
gcloud functions deploy vpncheck --runtime python39 --trigger-http --allow-unauthenticated --set-env-vars BEARER_TOKEN=$SOME_SECRET_API_KEY_FOR_YOUR_WEBHOOK,VPNAPIIO_API_KEY=$VPNAPIIO_API_KEY,IPQS_API_KEY=$IPQS_API_KEY,ENABLE_CLOUD_LOGGING=true --source=.
```

Note: You may need to create a `venv` for dependencies to load correctly.
Expand Down
96 changes: 96 additions & 0 deletions ory-actions/vpncheck-py/focsec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Copyright © 2023 Ory Corp
# SPDX-License-Identifier: Apache-2.0

from flask import Flask, request, jsonify
import requests

import os

# load google cloud logging if running on GCP
if os.getenv('ENABLE_CLOUD_LOGGING', ''):
# set up the Google Cloud Logging python client library
import google.cloud.logging
client = google.cloud.logging.Client()
client.setup_logging()

# use Python’s standard logging library to send logs to GCP
import logging

app = Flask(__name__)

# Define the bearer token for authentication
BEARER_TOKEN = os.environ.get("BEARER_TOKEN")
FOCSEC_API_KEY = os.environ.get("FOCSEC_API_KEY")

if not BEARER_TOKEN or not FOCSEC_API_KEY:
raise ValueError("BEARER_TOKEN or FOCSEC_API_KEY not set in environment variables.")

@app.route("/vpncheck", methods=["POST"])
def handle_vpncheck():
return vpncheck(request)

def vpncheck(request):
# Check for bearer token authentication
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return jsonify({"error": "Unauthorized"}), 401

provided_token = auth_header.split("Bearer ")[1]
if provided_token != BEARER_TOKEN:
return jsonify({"error": "Unauthorized"}), 401

# Parse the JSON payload and extract the IP address
data = request.get_json()
logging.info(f"request: {data}")
ip_address = data.get("ip_address")
if not ip_address:
return error_response("Cannot determine Client IP address")

# Call vpnapi.io to check the IP address
# if the API fails, we permit by default
try:
vpn_result = query_focsec(ip_address)
except Exception as e:
return jsonify({"warning": "Unable to check VPN: ", "details": str(e)}), 200

# Check the response from focsec
if "is_vpn" in vpn_result and vpn_result["is_vpn"] == True:
logging.info(f"Blocked: VPN")
return error_response("Request blocked: VPN")
if "is_tor" in vpn_result and vpn_result["is_tor"] == True:
logging.info(f"Blocked: Tor")
return error_response("Request blocked: Tor")

if (
"iso_code" in vpn_result
and vpn_result["iso_code"] == "ru"
):
logging.info(f"geoblock: {vpn_result['iso_code']}")
return error_response("Request blocked: Geolocation")

# Return the result as success or error details
return jsonify(vpn_result), 200


def error_response(msg):
return jsonify({"messages": [{ "messages": [{ "text": msg }] }]}), 400


def query_focsec(ip_address):
# Implement the logic to call focsec.com and retrieve the result
# You can use libraries like requests or httpx for making HTTP requests
# Return the response as a dictionary
# For example:

url = f"https://api.focsec.com/v1/ip/{ip_address}?api_key={FOCSEC_API_KEY}"
response = requests.get(url, timeout=1.5)
if response.status_code != 200:
raise Exception(f"vpnapi.io returned {response.status_code}")

result = response.json()

return result


if __name__ == "__main__":
app.run()
115 changes: 115 additions & 0 deletions ory-actions/vpncheck-py/ipqs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Copyright © 2023 Ory Corp
# SPDX-License-Identifier: Apache-2.0

from flask import Flask, request, jsonify
import requests

import os

# load google cloud logging if running on GCP
if os.getenv('ENABLE_CLOUD_LOGGING', ''):
# set up the Google Cloud Logging python client library
import google.cloud.logging
client = google.cloud.logging.Client()
client.setup_logging()

# use Python’s standard logging library to send logs to GCP
import logging

app = Flask(__name__)

# Define the bearer token for authentication
BEARER_TOKEN = os.environ.get("BEARER_TOKEN")
IPQS_API_KEY = os.environ.get("IPQS_API_KEY")

if not BEARER_TOKEN or not IPQS_API_KEY:
raise ValueError("BEARER_TOKEN or IPQS_API_KEY not set in environment variables.")

@app.route("/vpncheck", methods=["POST"])
def handle_vpncheck():
return vpncheck(request)

def vpncheck(request):
# Check for bearer token authentication
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return jsonify({"error": "Unauthorized"}), 401

provided_token = auth_header.split("Bearer ")[1]
if provided_token != BEARER_TOKEN:
return jsonify({"error": "Unauthorized"}), 401

# Parse the JSON payload and extract the IP address
data = request.get_json()
logging.info(f"request: {data}")
ip_address = data.get("ip_address")
if not ip_address:
return error_response("Cannot determine Client IP address")

# Call vpnapi.io to check the IP address
# if the API fails, we permit by default
try:
vpn_result = query_ipqs(ip_address)
except Exception as e:
return jsonify({"warning": "Unable to check VPN: ", "details": str(e)}), 200

# Check the response from IPQS - based on example from https://www.ipqualityscore.com/documentation/proxy-detection/overview

if not 'success' in vpn_result or vpn_result['success'] == False:
logging.info(f"IPQS failed, result: {vpn_result}")
return error_response("Can't verify IP address")

if "fraud_score" in vpn_result and vpn_result["fraud_score"] >= 80:
logging.info(f"Blocked: Fraud score {vpn_result['fraud_score']}")
return error_response("Authentication failed: IP address cannot be verified")
if "vpn" in vpn_result and vpn_result["vpn"] == True:
logging.info(f"Blocked: VPN")
return error_response("Authentication failed: Please disable your VPN.")
if "tor" in vpn_result and vpn_result["tor"] == True:
logging.info(f"Blocked: Tor")
return error_response("Authentication failed: Please disable Tor.")

if (
"iso_code" in vpn_result
and vpn_result["iso_code"] == "ru"
):
logging.info(f"geoblock: {vpn_result['iso_code']}")
return error_response("Request blocked: Geolocation")

# Return the result as success or error details
return jsonify(vpn_result), 200

def error_response(msg):
return jsonify({"messages": [{ "messages": [{ "text": msg }] }]}), 400


def query_ipqs(ip_address):
# Implement the logic to call ipqs.com and retrieve the result
# You can use libraries like requests or httpx for making HTTP requests
# Return the response as a dictionary
# For example:

ipqs_parameters = {
#'user_agent' : header_items['User-Agent'],
#'user_language' : header_items['Accept-Language'].split(',')[0],
'strictness' : 1,
# You may want to allow public access points like coffee shops, schools, corporations, etc...
'allow_public_access_points' : 'true',
# Reduce scoring penalties for mixed quality IP addresses shared by good and bad users.
'lighter_penalties' : 'false'
}


url = f"https://www.ipqualityscore.com/api/json/ip/{IPQS_API_KEY}/{ip_address}"
response = requests.get(url, params = ipqs_parameters, timeout = 1.5)


if response.status_code != 200:
raise Exception(f"IPQS returned {response.status_code}")

result = response.json()
return result


if __name__ == "__main__":
app.run()
File renamed without changes.
Loading