ministack-aws-emulator
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseMiniStack AWS Emulator
MiniStack AWS模拟器
Skill by ara.so — Daily 2026 Skills collection.
MiniStack is a free, MIT-licensed drop-in replacement for LocalStack that emulates 25+ AWS services (S3, SQS, DynamoDB, Lambda, SNS, IAM, STS, Kinesis, EventBridge, SecretsManager, SSM, CloudWatch, SES, and more) on a single port (). No account, no API key, no telemetry. Works with , AWS CLI, Terraform, CDK, and any SDK.
4566boto3技能由ara.so提供 — 2026年度技能合集。
MiniStack是一款遵循MIT许可的免费LocalStack直接替代方案,可在单个端口()上模拟25+种AWS服务(S3、SQS、DynamoDB、Lambda、SNS、IAM、STS、Kinesis、EventBridge、SecretsManager、SSM、CloudWatch、SES等)。无需账号、API密钥,无遥测数据收集。可与、AWS CLI、Terraform、CDK及所有SDK兼容使用。
4566boto3Installation
安装
Option 1: PyPI (simplest)
方案1: PyPI(最简单)
bash
pip install ministack
ministackbash
pip install ministack
ministackServer runs at http://localhost:4566
Server runs at http://localhost:4566
Change port: GATEWAY_PORT=5000 ministack
Change port: GATEWAY_PORT=5000 ministack
undefinedundefinedOption 2: Docker Hub
方案2: Docker Hub
bash
docker run -p 4566:4566 nahuelnucera/ministackbash
docker run -p 4566:4566 nahuelnucera/ministackOption 3: Docker Compose (from source)
方案3: Docker Compose(从源码安装)
bash
git clone https://github.com/Nahuel990/ministack
cd ministack
docker compose up -dbash
git clone https://github.com/Nahuel990/ministack
cd ministack
docker compose up -dVerify it's running
验证运行状态
bash
curl http://localhost:4566/_localstack/healthbash
curl http://localhost:4566/_localstack/healthConfiguration
配置
| Environment Variable | Default | Description |
|---|---|---|
| | Port to listen on |
| | Set to |
| 环境变量 | 默认值 | 描述 |
|---|---|---|
| | 服务监听端口 |
| | 设置为 |
AWS CLI Usage
AWS CLI 使用方法
bash
undefinedbash
undefinedSet credentials (any non-empty values work)
Set credentials (any non-empty values work)
export AWS_ACCESS_KEY_ID=test
export AWS_SECRET_ACCESS_KEY=test
export AWS_DEFAULT_REGION=us-east-1
export AWS_ACCESS_KEY_ID=test
export AWS_SECRET_ACCESS_KEY=test
export AWS_DEFAULT_REGION=us-east-1
S3
S3
aws --endpoint-url=http://localhost:4566 s3 mb s3://my-bucket
aws --endpoint-url=http://localhost:4566 s3 cp ./file.txt s3://my-bucket/
aws --endpoint-url=http://localhost:4566 s3 ls s3://my-bucket
aws --endpoint-url=http://localhost:4566 s3 mb s3://my-bucket
aws --endpoint-url=http://localhost:4566 s3 cp ./file.txt s3://my-bucket/
aws --endpoint-url=http://localhost:4566 s3 ls s3://my-bucket
SQS
SQS
aws --endpoint-url=http://localhost:4566 sqs create-queue --queue-name my-queue
aws --endpoint-url=http://localhost:4566 sqs list-queues
aws --endpoint-url=http://localhost:4566 sqs create-queue --queue-name my-queue
aws --endpoint-url=http://localhost:4566 sqs list-queues
DynamoDB
DynamoDB
aws --endpoint-url=http://localhost:4566 dynamodb list-tables
aws --endpoint-url=http://localhost:4566 dynamodb create-table
--table-name Users
--attribute-definitions AttributeName=userId,AttributeType=S
--key-schema AttributeName=userId,KeyType=HASH
--billing-mode PAY_PER_REQUEST
--table-name Users
--attribute-definitions AttributeName=userId,AttributeType=S
--key-schema AttributeName=userId,KeyType=HASH
--billing-mode PAY_PER_REQUEST
aws --endpoint-url=http://localhost:4566 dynamodb list-tables
aws --endpoint-url=http://localhost:4566 dynamodb create-table
--table-name Users
--attribute-definitions AttributeName=userId,AttributeType=S
--key-schema AttributeName=userId,KeyType=HASH
--billing-mode PAY_PER_REQUEST
--table-name Users
--attribute-definitions AttributeName=userId,AttributeType=S
--key-schema AttributeName=userId,KeyType=HASH
--billing-mode PAY_PER_REQUEST
STS (identity check)
STS (identity check)
aws --endpoint-url=http://localhost:4566 sts get-caller-identity
aws --endpoint-url=http://localhost:4566 sts get-caller-identity
Use a named profile instead
Use a named profile instead
aws configure --profile local
aws configure --profile local
Enter: test / test / us-east-1 / json
Enter: test / test / us-east-1 / json
aws --profile local --endpoint-url=http://localhost:4566 s3 ls
undefinedaws --profile local --endpoint-url=http://localhost:4566 s3 ls
undefinedawslocal wrapper (from source)
awslocal 封装脚本(从源码使用)
bash
chmod +x bin/awslocal
./bin/awslocal s3 ls
./bin/awslocal dynamodb list-tablesbash
chmod +x bin/awslocal
./bin/awslocal s3 ls
./bin/awslocal dynamodb list-tablesboto3 Usage Patterns
boto3 使用示例
Universal client factory
通用客户端工厂
python
import boto3
ENDPOINT = "http://localhost:4566"
def aws_client(service: str):
return boto3.client(
service,
endpoint_url=ENDPOINT,
aws_access_key_id="test",
aws_secret_access_key="test",
region_name="us-east-1",
)
def aws_resource(service: str):
return boto3.resource(
service,
endpoint_url=ENDPOINT,
aws_access_key_id="test",
aws_secret_access_key="test",
region_name="us-east-1",
)python
import boto3
ENDPOINT = "http://localhost:4566"
def aws_client(service: str):
return boto3.client(
service,
endpoint_url=ENDPOINT,
aws_access_key_id="test",
aws_secret_access_key="test",
region_name="us-east-1",
)
def aws_resource(service: str):
return boto3.resource(
service,
endpoint_url=ENDPOINT,
aws_access_key_id="test",
aws_secret_access_key="test",
region_name="us-east-1",
)S3
S3
python
s3 = aws_client("s3")python
s3 = aws_client("s3")Create bucket and upload
Create bucket and upload
s3.create_bucket(Bucket="my-bucket")
s3.put_object(Bucket="my-bucket", Key="hello.txt", Body=b"Hello, MiniStack!")
s3.create_bucket(Bucket="my-bucket")
s3.put_object(Bucket="my-bucket", Key="hello.txt", Body=b"Hello, MiniStack!")
Download
Download
obj = s3.get_object(Bucket="my-bucket", Key="hello.txt")
print(obj["Body"].read()) # b'Hello, MiniStack!'
obj = s3.get_object(Bucket="my-bucket", Key="hello.txt")
print(obj["Body"].read()) # b'Hello, MiniStack!'
List objects
List objects
response = s3.list_objects_v2(Bucket="my-bucket")
for item in response.get("Contents", []):
print(item["Key"])
response = s3.list_objects_v2(Bucket="my-bucket")
for item in response.get("Contents", []):
print(item["Key"])
Copy object
Copy object
s3.copy_object(
Bucket="my-bucket",
CopySource={"Bucket": "my-bucket", "Key": "hello.txt"},
Key="hello-copy.txt",
)
s3.copy_object(
Bucket="my-bucket",
CopySource={"Bucket": "my-bucket", "Key": "hello.txt"},
Key="hello-copy.txt",
)
Enable versioning
Enable versioning
s3.put_bucket_versioning(
Bucket="my-bucket",
VersioningConfiguration={"Status": "Enabled"},
)
s3.put_bucket_versioning(
Bucket="my-bucket",
VersioningConfiguration={"Status": "Enabled"},
)
Presigned URL (works locally)
Presigned URL (works locally)
url = s3.generate_presigned_url(
"get_object",
Params={"Bucket": "my-bucket", "Key": "hello.txt"},
ExpiresIn=3600,
)
undefinedurl = s3.generate_presigned_url(
"get_object",
Params={"Bucket": "my-bucket", "Key": "hello.txt"},
ExpiresIn=3600,
)
undefinedSQS
SQS
python
sqs = aws_client("sqs")python
sqs = aws_client("sqs")Standard queue
Standard queue
queue = sqs.create_queue(QueueName="my-queue")
queue_url = queue["QueueUrl"]
sqs.send_message(QueueUrl=queue_url, MessageBody='{"event": "user_signup"}')
messages = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10)
for msg in messages.get("Messages", []):
print(msg["Body"])
sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=msg["ReceiptHandle"])
queue = sqs.create_queue(QueueName="my-queue")
queue_url = queue["QueueUrl"]
sqs.send_message(QueueUrl=queue_url, MessageBody='{"event": "user_signup"}')
messages = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10)
for msg in messages.get("Messages", []):
print(msg["Body"])
sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=msg["ReceiptHandle"])
FIFO queue
FIFO queue
fifo = sqs.create_queue(
QueueName="my-queue.fifo",
Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"},
)
fifo = sqs.create_queue(
QueueName="my-queue.fifo",
Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"},
)
Dead-letter queue setup
Dead-letter queue setup
dlq = sqs.create_queue(QueueName="my-dlq")
dlq_attrs = sqs.get_queue_attributes(
QueueUrl=dlq["QueueUrl"], AttributeNames=["QueueArn"]
)
sqs.set_queue_attributes(
QueueUrl=queue_url,
Attributes={
"RedrivePolicy": json.dumps({
"deadLetterTargetArn": dlq_attrs["Attributes"]["QueueArn"],
"maxReceiveCount": "3",
})
},
)
undefineddlq = sqs.create_queue(QueueName="my-dlq")
dlq_attrs = sqs.get_queue_attributes(
QueueUrl=dlq["QueueUrl"], AttributeNames=["QueueArn"]
)
sqs.set_queue_attributes(
QueueUrl=queue_url,
Attributes={
"RedrivePolicy": json.dumps({
"deadLetterTargetArn": dlq_attrs["Attributes"]["QueueArn"],
"maxReceiveCount": "3",
})
},
)
undefinedDynamoDB
DynamoDB
python
import json
ddb = aws_client("dynamodb")python
import json
ddb = aws_client("dynamodb")Create table
Create table
ddb.create_table(
TableName="Users",
KeySchema=[
{"AttributeName": "userId", "KeyType": "HASH"},
{"AttributeName": "createdAt", "KeyType": "RANGE"},
],
AttributeDefinitions=[
{"AttributeName": "userId", "AttributeType": "S"},
{"AttributeName": "createdAt", "AttributeType": "N"},
],
BillingMode="PAY_PER_REQUEST",
)
ddb.create_table(
TableName="Users",
KeySchema=[
{"AttributeName": "userId", "KeyType": "HASH"},
{"AttributeName": "createdAt", "KeyType": "RANGE"},
],
AttributeDefinitions=[
{"AttributeName": "userId", "AttributeType": "S"},
{"AttributeName": "createdAt", "AttributeType": "N"},
],
BillingMode="PAY_PER_REQUEST",
)
Put / Get / Delete
Put / Get / Delete
ddb.put_item(
TableName="Users",
Item={
"userId": {"S": "u1"},
"createdAt": {"N": "1700000000"},
"name": {"S": "Alice"},
"active": {"BOOL": True},
},
)
item = ddb.get_item(
TableName="Users",
Key={"userId": {"S": "u1"}, "createdAt": {"N": "1700000000"}},
)
print(item["Item"]["name"]["S"]) # Alice
ddb.put_item(
TableName="Users",
Item={
"userId": {"S": "u1"},
"createdAt": {"N": "1700000000"},
"name": {"S": "Alice"},
"active": {"BOOL": True},
},
)
item = ddb.get_item(
TableName="Users",
Key={"userId": {"S": "u1"}, "createdAt": {"N": "1700000000"}},
)
print(item["Item"]["name"]["S"]) # Alice
Query
Query
result = ddb.query(
TableName="Users",
KeyConditionExpression="userId = :uid",
ExpressionAttributeValues={":uid": {"S": "u1"}},
)
result = ddb.query(
TableName="Users",
KeyConditionExpression="userId = :uid",
ExpressionAttributeValues={":uid": {"S": "u1"}},
)
Batch write
Batch write
ddb.batch_write_item(
RequestItems={
"Users": [
{"PutRequest": {"Item": {"userId": {"S": "u2"}, "createdAt": {"N": "1700000001"}, "name": {"S": "Bob"}}}},
]
}
)
ddb.batch_write_item(
RequestItems={
"Users": [
{"PutRequest": {"Item": {"userId": {"S": "u2"}, "createdAt": {"N": "1700000001"}, "name": {"S": "Bob"}}}},
]
}
)
TTL
TTL
ddb.update_time_to_live(
TableName="Users",
TimeToLiveSpecification={"Enabled": True, "AttributeName": "expiresAt"},
)
undefinedddb.update_time_to_live(
TableName="Users",
TimeToLiveSpecification={"Enabled": True, "AttributeName": "expiresAt"},
)
undefinedSNS + SQS Fanout
SNS + SQS 扇出
python
sns = aws_client("sns")
sqs = aws_client("sqs")
topic = sns.create_topic(Name="my-topic")
topic_arn = topic["TopicArn"]
queue = sqs.create_queue(QueueName="fan-queue")
queue_attrs = sqs.get_queue_attributes(
QueueUrl=queue["QueueUrl"], AttributeNames=["QueueArn"]
)
queue_arn = queue_attrs["Attributes"]["QueueArn"]
sns.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)python
sns = aws_client("sns")
sqs = aws_client("sqs")
topic = sns.create_topic(Name="my-topic")
topic_arn = topic["TopicArn"]
queue = sqs.create_queue(QueueName="fan-queue")
queue_attrs = sqs.get_queue_attributes(
QueueUrl=queue["QueueUrl"], AttributeNames=["QueueArn"]
)
queue_arn = queue_attrs["Attributes"]["QueueArn"]
sns.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)Publish — message is fanned out to subscribed SQS queues
Publish — message is fanned out to subscribed SQS queues
sns.publish(TopicArn=topic_arn, Message="hello fanout", Subject="test")
undefinedsns.publish(TopicArn=topic_arn, Message="hello fanout", Subject="test")
undefinedLambda
Lambda
python
import zipfile, iopython
import zipfile, ioCreate a zip with handler code
Create a zip with handler code
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("handler.py", """
def handler(event, context):
print("event:", event)
return {"statusCode": 200, "body": "ok"}
""")
buf.seek(0)
lam = aws_client("lambda")
lam.create_function(
FunctionName="my-function",
Runtime="python3.12",
Role="arn:aws:iam::000000000000:role/role",
Handler="handler.handler",
Code={"ZipFile": buf.read()},
)
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("handler.py", """
def handler(event, context):
print("event:", event)
return {"statusCode": 200, "body": "ok"}
""")
buf.seek(0)
lam = aws_client("lambda")
lam.create_function(
FunctionName="my-function",
Runtime="python3.12",
Role="arn:aws:iam::000000000000:role/role",
Handler="handler.handler",
Code={"ZipFile": buf.read()},
)
Invoke synchronously
Invoke synchronously
import json
response = lam.invoke(
FunctionName="my-function",
InvocationType="RequestResponse",
Payload=json.dumps({"key": "value"}),
)
result = json.loads(response["Payload"].read())
print(result) # {"statusCode": 200, "body": "ok"}
import json
response = lam.invoke(
FunctionName="my-function",
InvocationType="RequestResponse",
Payload=json.dumps({"key": "value"}),
)
result = json.loads(response["Payload"].read())
print(result) # {"statusCode": 200, "body": "ok"}
SQS event source mapping
SQS event source mapping
lam.create_event_source_mapping(
EventSourceArn=queue_arn,
FunctionName="my-function",
BatchSize=10,
Enabled=True,
)
undefinedlam.create_event_source_mapping(
EventSourceArn=queue_arn,
FunctionName="my-function",
BatchSize=10,
Enabled=True,
)
undefinedSecretsManager
SecretsManager
python
sm = aws_client("secretsmanager")
sm.create_secret(Name="db-password", SecretString='{"password":"s3cr3t"}')
secret = sm.get_secret_value(SecretId="db-password")
print(secret["SecretString"]) # {"password":"s3cr3t"}
sm.update_secret(SecretId="db-password", SecretString='{"password":"newpass"}')
sm.delete_secret(SecretId="db-password", ForceDeleteWithoutRecovery=True)python
sm = aws_client("secretsmanager")
sm.create_secret(Name="db-password", SecretString='{"password":"s3cr3t"}')
secret = sm.get_secret_value(SecretId="db-password")
print(secret["SecretString"]) # {"password":"s3cr3t"}
sm.update_secret(SecretId="db-password", SecretString='{"password":"newpass"}')
sm.delete_secret(SecretId="db-password", ForceDeleteWithoutRecovery=True)SSM Parameter Store
SSM 参数存储
python
ssm = aws_client("ssm")
ssm.put_parameter(Name="/app/db/host", Value="localhost", Type="String")
ssm.put_parameter(Name="/app/db/password", Value="secret", Type="SecureString")
param = ssm.get_parameter(Name="/app/db/host")
print(param["Parameter"]["Value"]) # localhostpython
ssm = aws_client("ssm")
ssm.put_parameter(Name="/app/db/host", Value="localhost", Type="String")
ssm.put_parameter(Name="/app/db/password", Value="secret", Type="SecureString")
param = ssm.get_parameter(Name="/app/db/host")
print(param["Parameter"]["Value"]) # localhostFetch all params under a path
Fetch all params under a path
params = ssm.get_parameters_by_path(Path="/app/", Recursive=True)
for p in params["Parameters"]:
print(p["Name"], p["Value"])
undefinedparams = ssm.get_parameters_by_path(Path="/app/", Recursive=True)
for p in params["Parameters"]:
print(p["Name"], p["Value"])
undefinedKinesis
Kinesis
python
import base64
kin = aws_client("kinesis")
kin.create_stream(StreamName="events", ShardCount=1)
kin.put_record(StreamName="events", Data=b'{"event":"click"}', PartitionKey="user1")python
import base64
kin = aws_client("kinesis")
kin.create_stream(StreamName="events", ShardCount=1)
kin.put_record(StreamName="events", Data=b'{"event":"click"}', PartitionKey="user1")Get records
Get records
shards = kin.list_shards(StreamName="events")
shard_id = shards["Shards"][0]["ShardId"]
iterator = kin.get_shard_iterator(
StreamName="events",
ShardId=shard_id,
ShardIteratorType="TRIM_HORIZON",
)
records = kin.get_records(ShardIterator=iterator["ShardIterator"])
for r in records["Records"]:
print(base64.b64decode(r["Data"]))
undefinedshards = kin.list_shards(StreamName="events")
shard_id = shards["Shards"][0]["ShardId"]
iterator = kin.get_shard_iterator(
StreamName="events",
ShardId=shard_id,
ShardIteratorType="TRIM_HORIZON",
)
records = kin.get_records(ShardIterator=iterator["ShardIterator"])
for r in records["Records"]:
print(base64.b64decode(r["Data"]))
undefinedEventBridge
EventBridge
python
eb = aws_client("events")python
eb = aws_client("events")Create a custom bus
Create a custom bus
eb.create_event_bus(Name="my-bus")
eb.create_event_bus(Name="my-bus")
Put a rule targeting a Lambda
Put a rule targeting a Lambda
eb.put_rule(
Name="my-rule",
EventBusName="my-bus",
EventPattern='{"source": ["myapp"]}',
State="ENABLED",
)
eb.put_targets(
Rule="my-rule",
EventBusName="my-bus",
Targets=[{"Id": "1", "Arn": "arn:aws:lambda:us-east-1:000000000000:function:my-function"}],
)
eb.put_rule(
Name="my-rule",
EventBusName="my-bus",
EventPattern='{"source": ["myapp"]}',
State="ENABLED",
)
eb.put_targets(
Rule="my-rule",
EventBusName="my-bus",
Targets=[{"Id": "1", "Arn": "arn:aws:lambda:us-east-1:000000000000:function:my-function"}],
)
Emit an event (triggers Lambda target)
Emit an event (triggers Lambda target)
eb.put_events(Entries=[{
"Source": "myapp",
"DetailType": "UserSignup",
"Detail": '{"userId": "123"}',
"EventBusName": "my-bus",
}])
undefinedeb.put_events(Entries=[{
"Source": "myapp",
"DetailType": "UserSignup",
"Detail": '{"userId": "123"}',
"EventBusName": "my-bus",
}])
undefinedCloudWatch Logs
CloudWatch 日志
python
import time
logs = aws_client("logs")
logs.create_log_group(logGroupName="/app/service")
logs.create_log_stream(logGroupName="/app/service", logStreamName="stream-1")
logs.put_log_events(
logGroupName="/app/service",
logStreamName="stream-1",
logEvents=[
{"timestamp": int(time.time() * 1000), "message": "App started"},
{"timestamp": int(time.time() * 1000), "message": "Request received"},
],
)
events = logs.get_log_events(
logGroupName="/app/service",
logStreamName="stream-1",
)
for e in events["events"]:
print(e["message"])python
import time
logs = aws_client("logs")
logs.create_log_group(logGroupName="/app/service")
logs.create_log_stream(logGroupName="/app/service", logStreamName="stream-1")
logs.put_log_events(
logGroupName="/app/service",
logStreamName="stream-1",
logEvents=[
{"timestamp": int(time.time() * 1000), "message": "App started"},
{"timestamp": int(time.time() * 1000), "message": "Request received"},
],
)
events = logs.get_log_events(
logGroupName="/app/service",
logStreamName="stream-1",
)
for e in events["events"]:
print(e["message"])Filter with glob patterns (* and ?), AND terms, -exclusions
Filter with glob patterns (* and ?), AND terms, -exclusions
filtered = logs.filter_log_events(
logGroupName="/app/service",
filterPattern="Request*",
)
---filtered = logs.filter_log_events(
logGroupName="/app/service",
filterPattern="Request*",
)
---Testing Patterns
测试示例
pytest fixture (recommended)
pytest fixture(推荐)
python
import pytest
import boto3
MINISTACK_ENDPOINT = "http://localhost:4566"
@pytest.fixture(scope="session")
def aws_endpoint():
return MINISTACK_ENDPOINT
@pytest.fixture
def s3_client(aws_endpoint):
return boto3.client(
"s3",
endpoint_url=aws_endpoint,
aws_access_key_id="test",
aws_secret_access_key="test",
region_name="us-east-1",
)
@pytest.fixture
def test_bucket(s3_client):
bucket = "test-bucket"
s3_client.create_bucket(Bucket=bucket)
yield bucket
# Cleanup
objs = s3_client.list_objects_v2(Bucket=bucket).get("Contents", [])
for obj in objs:
s3_client.delete_object(Bucket=bucket, Key=obj["Key"])
s3_client.delete_bucket(Bucket=bucket)
def test_upload_download(s3_client, test_bucket):
s3_client.put_object(Bucket=test_bucket, Key="test.txt", Body=b"hello")
resp = s3_client.get_object(Bucket=test_bucket, Key="test.txt")
assert resp["Body"].read() == b"hello"python
import pytest
import boto3
MINISTACK_ENDPOINT = "http://localhost:4566"
@pytest.fixture(scope="session")
def aws_endpoint():
return MINISTACK_ENDPOINT
@pytest.fixture
def s3_client(aws_endpoint):
return boto3.client(
"s3",
endpoint_url=aws_endpoint,
aws_access_key_id="test",
aws_secret_access_key="test",
region_name="us-east-1",
)
@pytest.fixture
def test_bucket(s3_client):
bucket = "test-bucket"
s3_client.create_bucket(Bucket=bucket)
yield bucket
# Cleanup
objs = s3_client.list_objects_v2(Bucket=bucket).get("Contents", [])
for obj in objs:
s3_client.delete_object(Bucket=bucket, Key=obj["Key"])
s3_client.delete_bucket(Bucket=bucket)
def test_upload_download(s3_client, test_bucket):
s3_client.put_object(Bucket=test_bucket, Key="test.txt", Body=b"hello")
resp = s3_client.get_object(Bucket=test_bucket, Key="test.txt")
assert resp["Body"].read() == b"hello"GitHub Actions CI integration
GitHub Actions CI 集成
yaml
undefinedyaml
undefined.github/workflows/test.yml
.github/workflows/test.yml
name: Test
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
services:
ministack:
image: nahuelnucera/ministack
ports:
- 4566:4566
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.12"
- run: pip install -r requirements.txt
- run: pytest
env:
AWS_ACCESS_KEY_ID: test
AWS_SECRET_ACCESS_KEY: test
AWS_DEFAULT_REGION: us-east-1
AWS_ENDPOINT_URL: http://localhost:4566
undefinedname: Test
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
services:
ministack:
image: nahuelnucera/ministack
ports:
- 4566:4566
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.12"
- run: pip install -r requirements.txt
- run: pytest
env:
AWS_ACCESS_KEY_ID: test
AWS_SECRET_ACCESS_KEY: test
AWS_DEFAULT_REGION: us-east-1
AWS_ENDPOINT_URL: http://localhost:4566
undefinedUsing AWS_ENDPOINT_URL env var (boto3 >= 1.28)
使用AWS_ENDPOINT_URL环境变量(boto3 >= 1.28)
python
import os
import boto3python
import os
import boto3If AWS_ENDPOINT_URL is set, boto3 uses it automatically — no endpoint_url kwarg needed
If AWS_ENDPOINT_URL is set, boto3 uses it automatically — no endpoint_url kwarg needed
export AWS_ENDPOINT_URL=http://localhost:4566
export AWS_ENDPOINT_URL=http://localhost:4566
s3 = boto3.client("s3") # picks up AWS_ENDPOINT_URL automatically
---s3 = boto3.client("s3") # picks up AWS_ENDPOINT_URL automatically
---Supported Services (25+)
支持的服务(25+)
| Service | Key Operations |
|---|---|
| S3 | CRUD, multipart, versioning, encryption, lifecycle, CORS, ACL, notifications |
| SQS | Standard & FIFO queues, DLQ, batch ops |
| SNS | Topics, subscriptions, fanout to SQS/Lambda, platform endpoints |
| DynamoDB | Tables, CRUD, Query, Scan, TTL, transactions, batch ops |
| Lambda | Python runtimes, invoke, SQS event sources, Function URLs |
| IAM | Users, roles, policies, groups, instance profiles, OIDC |
| STS | GetCallerIdentity, AssumeRole, GetSessionToken |
| SecretsManager | Full CRUD, rotation, versioning |
| SSM Parameter Store | String, SecureString, StringList, path queries |
| EventBridge | Buses, rules, targets, Lambda dispatch |
| Kinesis | Streams, shards, records, iterators |
| CloudWatch Metrics | PutMetricData, alarms, dashboards, CBOR protocol |
| CloudWatch Logs | Log groups/streams, filter with globs, metric filters |
| SES | Send email, templates, configuration sets |
| Step Functions | State machine CRUD |
| RDS | Spins up real Postgres/MySQL containers |
| ElastiCache | Spins up real Redis containers |
| Athena | Real SQL via DuckDB |
| ECS | Real Docker containers |
| 服务 | 核心功能 |
|---|---|
| S3 | CRUD、分片上传、版本控制、加密、生命周期、CORS、ACL、通知 |
| SQS | 标准&FIFO队列、死信队列、批量操作 |
| SNS | 主题、订阅、扇出到SQS/Lambda、平台端点 |
| DynamoDB | 表操作、CRUD、查询、扫描、TTL、事务、批量操作 |
| Lambda | Python运行时、调用、SQS事件源、函数URL |
| IAM | 用户、角色、策略、用户组、实例配置文件、OIDC |
| STS | GetCallerIdentity、AssumeRole、GetSessionToken |
| SecretsManager | 完整CRUD、轮换、版本控制 |
| SSM 参数存储 | String、SecureString、StringList、路径查询 |
| EventBridge | 事件总线、规则、目标、Lambda触发 |
| Kinesis | 流、分片、记录、迭代器 |
| CloudWatch 指标 | PutMetricData、告警、仪表盘、CBOR协议 |
| CloudWatch 日志 | 日志组/流、通配符过滤、指标过滤器 |
| SES | 邮件发送、模板、配置集 |
| Step Functions | 状态机CRUD |
| RDS | 启动真实Postgres/MySQL容器 |
| ElastiCache | 启动真实Redis容器 |
| Athena | 基于DuckDB的真实SQL查询 |
| ECS | 真实Docker容器 |
Troubleshooting
故障排查
Connection refused on port 4566
bash
undefined端口4566连接被拒绝
bash
undefinedCheck if ministack is running
Check if ministack is running
Start it
Start it
ministack
ministack
or
or
docker run -p 4566:4566 nahuelnucera/ministack
**`NoCredentialsError` from boto3**
```bash
export AWS_ACCESS_KEY_ID=test
export AWS_SECRET_ACCESS_KEY=test
export AWS_DEFAULT_REGION=us-east-1docker run -p 4566:4566 nahuelnucera/ministack
**boto3抛出`NoCredentialsError`**
```bash
export AWS_ACCESS_KEY_ID=test
export AWS_SECRET_ACCESS_KEY=test
export AWS_DEFAULT_REGION=us-east-1Any non-empty values work — MiniStack doesn't validate credentials
Any non-empty values work — MiniStack doesn't validate credentials
**`InvalidSignatureException`**
- This is usually a region mismatch. Ensure `region_name="us-east-1"` matches across all clients.
**Lambda function not found after create**
- MiniStack executes Python runtimes with a warm worker pool. Wait briefly or invoke with `InvocationType="Event"` for async.
**S3 data lost on restart**
```bash
**`InvalidSignatureException`错误**
- 通常是区域不匹配导致,请确保所有客户端的`region_name="us-east-1"`配置一致。
**创建后找不到Lambda函数**
- MiniStack使用热工作池执行Python运行时,请稍等片刻,或使用`InvocationType="Event"`异步调用。
**重启后S3数据丢失**
```bashEnable persistence
Enable persistence
S3_PERSIST=1 ministack
S3_PERSIST=1 ministack
or in Docker
or in Docker
docker run -p 4566:4566 -e S3_PERSIST=1 -v $(pwd)/data:/data nahuelnucera/ministack
**Port conflict**
```bash
GATEWAY_PORT=5000 ministackdocker run -p 4566:4566 -e S3_PERSIST=1 -v $(pwd)/data:/data nahuelnucera/ministack
**端口冲突**
```bash
GATEWAY_PORT=5000 ministackThen use http://localhost:5000 as endpoint
Then use http://localhost:5000 as endpoint
**Migrating from LocalStack**
- Replace all `http://localhost:4566` endpoint URLs — they stay the same.
- Remove `LOCALSTACK_AUTH_TOKEN` / `LOCALSTACK_API_KEY` env vars (not needed).
- Replace `localstack/localstack` Docker image with `nahuelnucera/ministack`.
- All `boto3` client code works without modification.
**从LocalStack迁移**
- 保留所有`http://localhost:4566`端点URL无需修改。
- 移除`LOCALSTACK_AUTH_TOKEN`/`LOCALSTACK_API_KEY`环境变量(无需使用)。
- 将`localstack/localstack`Docker镜像替换为`nahuelnucera/ministack`。
- 所有`boto3`客户端代码无需修改即可运行。