Loading...
Loading...
Write Python code in n8n Code nodes. Use when writing Python in n8n, using _input/_json/_node syntax, working with standard library, or need to understand Python limitations in n8n Code nodes.
npx skill4agent add promptadvisers/n8n-powerhouse n8n-code-python# Basic template for Python Code nodes
items = _input.all()
# Process data
processed = []
for item in items:
processed.append({
"json": {
**item["json"],
"processed": True,
"timestamp": datetime.now().isoformat()
}
})
return processed_input.all()_input.first()_input.item[{"json": {...}}]_json["body"]_json_input.all()_items# Example: Calculate total from all items
all_items = _input.all()
total = sum(item["json"].get("amount", 0) for item in all_items)
return [{
"json": {
"total": total,
"count": len(all_items),
"average": total / len(all_items) if all_items else 0
}
}]_input.item_item# Example: Add processing timestamp to each item
item = _input.item
return [{
"json": {
**item["json"],
"processed": True,
"processed_at": datetime.now().isoformat()
}
}]_input_json_node_now_today_jmespath()from datetime import datetime# Python (Beta) example
items = _input.all()
now = _now # Built-in datetime object
return [{
"json": {
"count": len(items),
"timestamp": now.isoformat()
}
}]_items_item_input_now# Python (Native) example
processed = []
for item in _items:
processed.append({
"json": {
"id": item["json"].get("id"),
"processed": True
}
})
return processed# Get all items from previous node
all_items = _input.all()
# Filter, transform as needed
valid = [item for item in all_items if item["json"].get("status") == "active"]
processed = []
for item in valid:
processed.append({
"json": {
"id": item["json"]["id"],
"name": item["json"]["name"]
}
})
return processed# Get first item only
first_item = _input.first()
data = first_item["json"]
return [{
"json": {
"result": process_data(data),
"processed_at": datetime.now().isoformat()
}
}]# Current item in loop (Each Item mode only)
current_item = _input.item
return [{
"json": {
**current_item["json"],
"item_processed": True
}
}]# Get output from specific node
webhook_data = _node["Webhook"]["json"]
http_data = _node["HTTP Request"]["json"]
return [{
"json": {
"combined": {
"webhook": webhook_data,
"api": http_data
}
}
}]["body"]# ❌ WRONG - Will raise KeyError
name = _json["name"]
email = _json["email"]
# ✅ CORRECT - Webhook data is under ["body"]
name = _json["body"]["name"]
email = _json["body"]["email"]
# ✅ SAFER - Use .get() for safe access
webhook_data = _json.get("body", {})
name = webhook_data.get("name")body"json"# ✅ Single result
return [{
"json": {
"field1": value1,
"field2": value2
}
}]
# ✅ Multiple results
return [
{"json": {"id": 1, "data": "first"}},
{"json": {"id": 2, "data": "second"}}
]
# ✅ List comprehension
transformed = [
{"json": {"id": item["json"]["id"], "processed": True}}
for item in _input.all()
if item["json"].get("valid")
]
return transformed
# ✅ Empty result (when no data to return)
return []
# ✅ Conditional return
if should_process:
return [{"json": processed_data}]
else:
return []# ❌ WRONG: Dictionary without list wrapper
return {
"json": {"field": value}
}
# ❌ WRONG: List without json wrapper
return [{"field": value}]
# ❌ WRONG: Plain string
return "processed"
# ❌ WRONG: Incomplete structure
return [{"data": value}] # Should be {"json": value}# ❌ NOT AVAILABLE - Will raise ModuleNotFoundError
import requests # ❌ No
import pandas # ❌ No
import numpy # ❌ No
import scipy # ❌ No
from bs4 import BeautifulSoup # ❌ No
import lxml # ❌ No# ✅ AVAILABLE - Standard library only
import json # ✅ JSON parsing
import datetime # ✅ Date/time operations
import re # ✅ Regular expressions
import base64 # ✅ Base64 encoding/decoding
import hashlib # ✅ Hashing functions
import urllib.parse # ✅ URL parsing
import math # ✅ Math functions
import random # ✅ Random numbers
import statistics # ✅ Statistical functions$helpers.httpRequest()items = _input.all()
return [
{
"json": {
"id": item["json"].get("id"),
"name": item["json"].get("name", "Unknown").upper(),
"processed": True
}
}
for item in items
]items = _input.all()
total = sum(item["json"].get("amount", 0) for item in items)
valid_items = [item for item in items if item["json"].get("amount", 0) > 0]
return [{
"json": {
"total": total,
"count": len(valid_items)
}
}]import re
items = _input.all()
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
all_emails = []
for item in items:
text = item["json"].get("text", "")
emails = re.findall(email_pattern, text)
all_emails.extend(emails)
# Remove duplicates
unique_emails = list(set(all_emails))
return [{
"json": {
"emails": unique_emails,
"count": len(unique_emails)
}
}]items = _input.all()
validated = []
for item in items:
data = item["json"]
errors = []
# Validate fields
if not data.get("email"):
errors.append("Email required")
if not data.get("name"):
errors.append("Name required")
validated.append({
"json": {
**data,
"valid": len(errors) == 0,
"errors": errors if errors else None
}
})
return validatedfrom statistics import mean, median, stdev
items = _input.all()
values = [item["json"].get("value", 0) for item in items if "value" in item["json"]]
if values:
return [{
"json": {
"mean": mean(values),
"median": median(values),
"stdev": stdev(values) if len(values) > 1 else 0,
"min": min(values),
"max": max(values),
"count": len(values)
}
}]
else:
return [{"json": {"error": "No values found"}}]# ❌ WRONG: Trying to import external library
import requests # ModuleNotFoundError!
# ✅ CORRECT: Use HTTP Request node or JavaScript
# Add HTTP Request node before Code node
# OR switch to JavaScript and use $helpers.httpRequest()# ❌ WRONG: No return statement
items = _input.all()
# Processing...
# Forgot to return!
# ✅ CORRECT: Always return data
items = _input.all()
# Processing...
return [{"json": item["json"]} for item in items]# ❌ WRONG: Returning dict instead of list
return {"json": {"result": "success"}}
# ✅ CORRECT: List wrapper required
return [{"json": {"result": "success"}}]# ❌ WRONG: Direct access crashes if missing
name = _json["user"]["name"] # KeyError!
# ✅ CORRECT: Use .get() for safe access
name = _json.get("user", {}).get("name", "Unknown")# ❌ WRONG: Direct access to webhook data
email = _json["email"] # KeyError!
# ✅ CORRECT: Webhook data under ["body"]
email = _json["body"]["email"]
# ✅ BETTER: Safe access with .get()
email = _json.get("body", {}).get("email", "no-email")# JSON operations
import json
data = json.loads(json_string)
json_output = json.dumps({"key": "value"})
# Date/time
from datetime import datetime, timedelta
now = datetime.now()
tomorrow = now + timedelta(days=1)
formatted = now.strftime("%Y-%m-%d")
# Regular expressions
import re
matches = re.findall(r'\d+', text)
cleaned = re.sub(r'[^\w\s]', '', text)
# Base64 encoding
import base64
encoded = base64.b64encode(data).decode()
decoded = base64.b64decode(encoded)
# Hashing
import hashlib
hash_value = hashlib.sha256(text.encode()).hexdigest()
# URL parsing
import urllib.parse
params = urllib.parse.urlencode({"key": "value"})
parsed = urllib.parse.urlparse(url)
# Statistics
from statistics import mean, median, stdev
average = mean([1, 2, 3, 4, 5])# ✅ SAFE: Won't crash if field missing
value = item["json"].get("field", "default")
# ❌ RISKY: Crashes if field doesn't exist
value = item["json"]["field"]# ✅ GOOD: Default to 0 if None
amount = item["json"].get("amount") or 0
# ✅ GOOD: Check for None explicitly
text = item["json"].get("text")
if text is None:
text = ""# ✅ PYTHONIC: List comprehension
valid = [item for item in items if item["json"].get("active")]
# ❌ VERBOSE: Manual loop
valid = []
for item in items:
if item["json"].get("active"):
valid.append(item)# ✅ CONSISTENT: Always list with "json" key
return [{"json": result}] # Single result
return results # Multiple results (already formatted)
return [] # No results# Debug statements appear in browser console (F12)
items = _input.all()
print(f"Processing {len(items)} items")
print(f"First item: {items[0] if items else 'None'}")statistics{{ }}{{ }}search_nodes({query: "code"})get_node_essentials("nodes-base.code")validate_node_operation(){"json": {...}}_input.all()_input.first()_input.item.get()["body"]