ministack-aws-emulator

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

MiniStack AWS Emulator

MiniStack AWS模拟器

Skill by ara.so — Daily 2026 Skills collection.
MiniStack is a free, MIT-licensed drop-in replacement for LocalStack that emulates 25+ AWS services (S3, SQS, DynamoDB, Lambda, SNS, IAM, STS, Kinesis, EventBridge, SecretsManager, SSM, CloudWatch, SES, and more) on a single port (
4566
). No account, no API key, no telemetry. Works with
boto3
, AWS CLI, Terraform, CDK, and any SDK.

技能由ara.so提供 — 2026年度技能合集。
MiniStack是一款遵循MIT许可的免费LocalStack直接替代方案,可在单个端口(
4566
)上模拟25+种AWS服务(S3、SQS、DynamoDB、Lambda、SNS、IAM、STS、Kinesis、EventBridge、SecretsManager、SSM、CloudWatch、SES等)。无需账号、API密钥,无遥测数据收集。可与
boto3
、AWS CLI、Terraform、CDK及所有SDK兼容使用。

Installation

安装

Option 1: PyPI (simplest)

方案1: PyPI(最简单)

bash
pip install ministack
ministack
bash
pip install ministack
ministack

Server runs at http://localhost:4566

Server runs at http://localhost:4566

Change port: GATEWAY_PORT=5000 ministack

Change port: GATEWAY_PORT=5000 ministack

undefined
undefined

Option 2: Docker Hub

方案2: Docker Hub

bash
docker run -p 4566:4566 nahuelnucera/ministack
bash
docker run -p 4566:4566 nahuelnucera/ministack

Option 3: Docker Compose (from source)

方案3: Docker Compose(从源码安装)

bash
git clone https://github.com/Nahuel990/ministack
cd ministack
docker compose up -d
bash
git clone https://github.com/Nahuel990/ministack
cd ministack
docker compose up -d

Verify it's running

验证运行状态

bash
curl http://localhost:4566/_localstack/health

bash
curl http://localhost:4566/_localstack/health

Configuration

配置

Environment VariableDefaultDescription
GATEWAY_PORT
4566
Port to listen on
S3_PERSIST
0
Set to
1
to persist S3 data to disk

环境变量默认值描述
GATEWAY_PORT
4566
服务监听端口
S3_PERSIST
0
设置为
1
可将S3数据持久化到磁盘

AWS CLI Usage

AWS CLI 使用方法

bash
undefined
bash
undefined

Set credentials (any non-empty values work)

Set credentials (any non-empty values work)

export AWS_ACCESS_KEY_ID=test export AWS_SECRET_ACCESS_KEY=test export AWS_DEFAULT_REGION=us-east-1
export AWS_ACCESS_KEY_ID=test export AWS_SECRET_ACCESS_KEY=test export AWS_DEFAULT_REGION=us-east-1

S3

S3

aws --endpoint-url=http://localhost:4566 s3 mb s3://my-bucket aws --endpoint-url=http://localhost:4566 s3 cp ./file.txt s3://my-bucket/ aws --endpoint-url=http://localhost:4566 s3 ls s3://my-bucket
aws --endpoint-url=http://localhost:4566 s3 mb s3://my-bucket aws --endpoint-url=http://localhost:4566 s3 cp ./file.txt s3://my-bucket/ aws --endpoint-url=http://localhost:4566 s3 ls s3://my-bucket

SQS

SQS

aws --endpoint-url=http://localhost:4566 sqs create-queue --queue-name my-queue aws --endpoint-url=http://localhost:4566 sqs list-queues
aws --endpoint-url=http://localhost:4566 sqs create-queue --queue-name my-queue aws --endpoint-url=http://localhost:4566 sqs list-queues

DynamoDB

DynamoDB

aws --endpoint-url=http://localhost:4566 dynamodb list-tables aws --endpoint-url=http://localhost:4566 dynamodb create-table
--table-name Users
--attribute-definitions AttributeName=userId,AttributeType=S
--key-schema AttributeName=userId,KeyType=HASH
--billing-mode PAY_PER_REQUEST
aws --endpoint-url=http://localhost:4566 dynamodb list-tables aws --endpoint-url=http://localhost:4566 dynamodb create-table
--table-name Users
--attribute-definitions AttributeName=userId,AttributeType=S
--key-schema AttributeName=userId,KeyType=HASH
--billing-mode PAY_PER_REQUEST

STS (identity check)

STS (identity check)

aws --endpoint-url=http://localhost:4566 sts get-caller-identity
aws --endpoint-url=http://localhost:4566 sts get-caller-identity

Use a named profile instead

Use a named profile instead

aws configure --profile local
aws configure --profile local

Enter: test / test / us-east-1 / json

Enter: test / test / us-east-1 / json

aws --profile local --endpoint-url=http://localhost:4566 s3 ls
undefined
aws --profile local --endpoint-url=http://localhost:4566 s3 ls
undefined

awslocal wrapper (from source)

awslocal 封装脚本(从源码使用)

bash
chmod +x bin/awslocal
./bin/awslocal s3 ls
./bin/awslocal dynamodb list-tables

bash
chmod +x bin/awslocal
./bin/awslocal s3 ls
./bin/awslocal dynamodb list-tables

boto3 Usage Patterns

boto3 使用示例

Universal client factory

通用客户端工厂

python
import boto3

ENDPOINT = "http://localhost:4566"

def aws_client(service: str):
    return boto3.client(
        service,
        endpoint_url=ENDPOINT,
        aws_access_key_id="test",
        aws_secret_access_key="test",
        region_name="us-east-1",
    )

def aws_resource(service: str):
    return boto3.resource(
        service,
        endpoint_url=ENDPOINT,
        aws_access_key_id="test",
        aws_secret_access_key="test",
        region_name="us-east-1",
    )
python
import boto3

ENDPOINT = "http://localhost:4566"

def aws_client(service: str):
    return boto3.client(
        service,
        endpoint_url=ENDPOINT,
        aws_access_key_id="test",
        aws_secret_access_key="test",
        region_name="us-east-1",
    )

def aws_resource(service: str):
    return boto3.resource(
        service,
        endpoint_url=ENDPOINT,
        aws_access_key_id="test",
        aws_secret_access_key="test",
        region_name="us-east-1",
    )

S3

S3

python
s3 = aws_client("s3")
python
s3 = aws_client("s3")

Create bucket and upload

Create bucket and upload

s3.create_bucket(Bucket="my-bucket") s3.put_object(Bucket="my-bucket", Key="hello.txt", Body=b"Hello, MiniStack!")
s3.create_bucket(Bucket="my-bucket") s3.put_object(Bucket="my-bucket", Key="hello.txt", Body=b"Hello, MiniStack!")

Download

Download

obj = s3.get_object(Bucket="my-bucket", Key="hello.txt") print(obj["Body"].read()) # b'Hello, MiniStack!'
obj = s3.get_object(Bucket="my-bucket", Key="hello.txt") print(obj["Body"].read()) # b'Hello, MiniStack!'

List objects

List objects

response = s3.list_objects_v2(Bucket="my-bucket") for item in response.get("Contents", []): print(item["Key"])
response = s3.list_objects_v2(Bucket="my-bucket") for item in response.get("Contents", []): print(item["Key"])

Copy object

Copy object

s3.copy_object( Bucket="my-bucket", CopySource={"Bucket": "my-bucket", "Key": "hello.txt"}, Key="hello-copy.txt", )
s3.copy_object( Bucket="my-bucket", CopySource={"Bucket": "my-bucket", "Key": "hello.txt"}, Key="hello-copy.txt", )

Enable versioning

Enable versioning

s3.put_bucket_versioning( Bucket="my-bucket", VersioningConfiguration={"Status": "Enabled"}, )
s3.put_bucket_versioning( Bucket="my-bucket", VersioningConfiguration={"Status": "Enabled"}, )

Presigned URL (works locally)

Presigned URL (works locally)

url = s3.generate_presigned_url( "get_object", Params={"Bucket": "my-bucket", "Key": "hello.txt"}, ExpiresIn=3600, )
undefined
url = s3.generate_presigned_url( "get_object", Params={"Bucket": "my-bucket", "Key": "hello.txt"}, ExpiresIn=3600, )
undefined

SQS

SQS

python
sqs = aws_client("sqs")
python
sqs = aws_client("sqs")

Standard queue

Standard queue

queue = sqs.create_queue(QueueName="my-queue") queue_url = queue["QueueUrl"]
sqs.send_message(QueueUrl=queue_url, MessageBody='{"event": "user_signup"}')
messages = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10) for msg in messages.get("Messages", []): print(msg["Body"]) sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=msg["ReceiptHandle"])
queue = sqs.create_queue(QueueName="my-queue") queue_url = queue["QueueUrl"]
sqs.send_message(QueueUrl=queue_url, MessageBody='{"event": "user_signup"}')
messages = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10) for msg in messages.get("Messages", []): print(msg["Body"]) sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=msg["ReceiptHandle"])

FIFO queue

FIFO queue

fifo = sqs.create_queue( QueueName="my-queue.fifo", Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"}, )
fifo = sqs.create_queue( QueueName="my-queue.fifo", Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"}, )

Dead-letter queue setup

Dead-letter queue setup

dlq = sqs.create_queue(QueueName="my-dlq") dlq_attrs = sqs.get_queue_attributes( QueueUrl=dlq["QueueUrl"], AttributeNames=["QueueArn"] ) sqs.set_queue_attributes( QueueUrl=queue_url, Attributes={ "RedrivePolicy": json.dumps({ "deadLetterTargetArn": dlq_attrs["Attributes"]["QueueArn"], "maxReceiveCount": "3", }) }, )
undefined
dlq = sqs.create_queue(QueueName="my-dlq") dlq_attrs = sqs.get_queue_attributes( QueueUrl=dlq["QueueUrl"], AttributeNames=["QueueArn"] ) sqs.set_queue_attributes( QueueUrl=queue_url, Attributes={ "RedrivePolicy": json.dumps({ "deadLetterTargetArn": dlq_attrs["Attributes"]["QueueArn"], "maxReceiveCount": "3", }) }, )
undefined

DynamoDB

DynamoDB

python
import json
ddb = aws_client("dynamodb")
python
import json
ddb = aws_client("dynamodb")

Create table

Create table

ddb.create_table( TableName="Users", KeySchema=[ {"AttributeName": "userId", "KeyType": "HASH"}, {"AttributeName": "createdAt", "KeyType": "RANGE"}, ], AttributeDefinitions=[ {"AttributeName": "userId", "AttributeType": "S"}, {"AttributeName": "createdAt", "AttributeType": "N"}, ], BillingMode="PAY_PER_REQUEST", )
ddb.create_table( TableName="Users", KeySchema=[ {"AttributeName": "userId", "KeyType": "HASH"}, {"AttributeName": "createdAt", "KeyType": "RANGE"}, ], AttributeDefinitions=[ {"AttributeName": "userId", "AttributeType": "S"}, {"AttributeName": "createdAt", "AttributeType": "N"}, ], BillingMode="PAY_PER_REQUEST", )

Put / Get / Delete

Put / Get / Delete

ddb.put_item( TableName="Users", Item={ "userId": {"S": "u1"}, "createdAt": {"N": "1700000000"}, "name": {"S": "Alice"}, "active": {"BOOL": True}, }, )
item = ddb.get_item( TableName="Users", Key={"userId": {"S": "u1"}, "createdAt": {"N": "1700000000"}}, ) print(item["Item"]["name"]["S"]) # Alice
ddb.put_item( TableName="Users", Item={ "userId": {"S": "u1"}, "createdAt": {"N": "1700000000"}, "name": {"S": "Alice"}, "active": {"BOOL": True}, }, )
item = ddb.get_item( TableName="Users", Key={"userId": {"S": "u1"}, "createdAt": {"N": "1700000000"}}, ) print(item["Item"]["name"]["S"]) # Alice

Query

Query

result = ddb.query( TableName="Users", KeyConditionExpression="userId = :uid", ExpressionAttributeValues={":uid": {"S": "u1"}}, )
result = ddb.query( TableName="Users", KeyConditionExpression="userId = :uid", ExpressionAttributeValues={":uid": {"S": "u1"}}, )

Batch write

Batch write

ddb.batch_write_item( RequestItems={ "Users": [ {"PutRequest": {"Item": {"userId": {"S": "u2"}, "createdAt": {"N": "1700000001"}, "name": {"S": "Bob"}}}}, ] } )
ddb.batch_write_item( RequestItems={ "Users": [ {"PutRequest": {"Item": {"userId": {"S": "u2"}, "createdAt": {"N": "1700000001"}, "name": {"S": "Bob"}}}}, ] } )

TTL

TTL

ddb.update_time_to_live( TableName="Users", TimeToLiveSpecification={"Enabled": True, "AttributeName": "expiresAt"}, )
undefined
ddb.update_time_to_live( TableName="Users", TimeToLiveSpecification={"Enabled": True, "AttributeName": "expiresAt"}, )
undefined

SNS + SQS Fanout

SNS + SQS 扇出

python
sns = aws_client("sns")
sqs = aws_client("sqs")

topic = sns.create_topic(Name="my-topic")
topic_arn = topic["TopicArn"]

queue = sqs.create_queue(QueueName="fan-queue")
queue_attrs = sqs.get_queue_attributes(
    QueueUrl=queue["QueueUrl"], AttributeNames=["QueueArn"]
)
queue_arn = queue_attrs["Attributes"]["QueueArn"]

sns.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)
python
sns = aws_client("sns")
sqs = aws_client("sqs")

topic = sns.create_topic(Name="my-topic")
topic_arn = topic["TopicArn"]

queue = sqs.create_queue(QueueName="fan-queue")
queue_attrs = sqs.get_queue_attributes(
    QueueUrl=queue["QueueUrl"], AttributeNames=["QueueArn"]
)
queue_arn = queue_attrs["Attributes"]["QueueArn"]

sns.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)

Publish — message is fanned out to subscribed SQS queues

Publish — message is fanned out to subscribed SQS queues

sns.publish(TopicArn=topic_arn, Message="hello fanout", Subject="test")
undefined
sns.publish(TopicArn=topic_arn, Message="hello fanout", Subject="test")
undefined

Lambda

Lambda

python
import zipfile, io
python
import zipfile, io

Create a zip with handler code

Create a zip with handler code

buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as zf: zf.writestr("handler.py", """ def handler(event, context): print("event:", event) return {"statusCode": 200, "body": "ok"} """) buf.seek(0)
lam = aws_client("lambda")
lam.create_function( FunctionName="my-function", Runtime="python3.12", Role="arn:aws:iam::000000000000:role/role", Handler="handler.handler", Code={"ZipFile": buf.read()}, )
buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as zf: zf.writestr("handler.py", """ def handler(event, context): print("event:", event) return {"statusCode": 200, "body": "ok"} """) buf.seek(0)
lam = aws_client("lambda")
lam.create_function( FunctionName="my-function", Runtime="python3.12", Role="arn:aws:iam::000000000000:role/role", Handler="handler.handler", Code={"ZipFile": buf.read()}, )

Invoke synchronously

Invoke synchronously

import json response = lam.invoke( FunctionName="my-function", InvocationType="RequestResponse", Payload=json.dumps({"key": "value"}), ) result = json.loads(response["Payload"].read()) print(result) # {"statusCode": 200, "body": "ok"}
import json response = lam.invoke( FunctionName="my-function", InvocationType="RequestResponse", Payload=json.dumps({"key": "value"}), ) result = json.loads(response["Payload"].read()) print(result) # {"statusCode": 200, "body": "ok"}

SQS event source mapping

SQS event source mapping

lam.create_event_source_mapping( EventSourceArn=queue_arn, FunctionName="my-function", BatchSize=10, Enabled=True, )
undefined
lam.create_event_source_mapping( EventSourceArn=queue_arn, FunctionName="my-function", BatchSize=10, Enabled=True, )
undefined

SecretsManager

SecretsManager

python
sm = aws_client("secretsmanager")

sm.create_secret(Name="db-password", SecretString='{"password":"s3cr3t"}')
secret = sm.get_secret_value(SecretId="db-password")
print(secret["SecretString"])  # {"password":"s3cr3t"}

sm.update_secret(SecretId="db-password", SecretString='{"password":"newpass"}')
sm.delete_secret(SecretId="db-password", ForceDeleteWithoutRecovery=True)
python
sm = aws_client("secretsmanager")

sm.create_secret(Name="db-password", SecretString='{"password":"s3cr3t"}')
secret = sm.get_secret_value(SecretId="db-password")
print(secret["SecretString"])  # {"password":"s3cr3t"}

sm.update_secret(SecretId="db-password", SecretString='{"password":"newpass"}')
sm.delete_secret(SecretId="db-password", ForceDeleteWithoutRecovery=True)

SSM Parameter Store

SSM 参数存储

python
ssm = aws_client("ssm")

ssm.put_parameter(Name="/app/db/host", Value="localhost", Type="String")
ssm.put_parameter(Name="/app/db/password", Value="secret", Type="SecureString")

param = ssm.get_parameter(Name="/app/db/host")
print(param["Parameter"]["Value"])  # localhost
python
ssm = aws_client("ssm")

ssm.put_parameter(Name="/app/db/host", Value="localhost", Type="String")
ssm.put_parameter(Name="/app/db/password", Value="secret", Type="SecureString")

param = ssm.get_parameter(Name="/app/db/host")
print(param["Parameter"]["Value"])  # localhost

Fetch all params under a path

Fetch all params under a path

params = ssm.get_parameters_by_path(Path="/app/", Recursive=True) for p in params["Parameters"]: print(p["Name"], p["Value"])
undefined
params = ssm.get_parameters_by_path(Path="/app/", Recursive=True) for p in params["Parameters"]: print(p["Name"], p["Value"])
undefined

Kinesis

Kinesis

python
import base64

kin = aws_client("kinesis")

kin.create_stream(StreamName="events", ShardCount=1)
kin.put_record(StreamName="events", Data=b'{"event":"click"}', PartitionKey="user1")
python
import base64

kin = aws_client("kinesis")

kin.create_stream(StreamName="events", ShardCount=1)
kin.put_record(StreamName="events", Data=b'{"event":"click"}', PartitionKey="user1")

Get records

Get records

shards = kin.list_shards(StreamName="events") shard_id = shards["Shards"][0]["ShardId"]
iterator = kin.get_shard_iterator( StreamName="events", ShardId=shard_id, ShardIteratorType="TRIM_HORIZON", ) records = kin.get_records(ShardIterator=iterator["ShardIterator"]) for r in records["Records"]: print(base64.b64decode(r["Data"]))
undefined
shards = kin.list_shards(StreamName="events") shard_id = shards["Shards"][0]["ShardId"]
iterator = kin.get_shard_iterator( StreamName="events", ShardId=shard_id, ShardIteratorType="TRIM_HORIZON", ) records = kin.get_records(ShardIterator=iterator["ShardIterator"]) for r in records["Records"]: print(base64.b64decode(r["Data"]))
undefined

EventBridge

EventBridge

python
eb = aws_client("events")
python
eb = aws_client("events")

Create a custom bus

Create a custom bus

eb.create_event_bus(Name="my-bus")
eb.create_event_bus(Name="my-bus")

Put a rule targeting a Lambda

Put a rule targeting a Lambda

eb.put_rule( Name="my-rule", EventBusName="my-bus", EventPattern='{"source": ["myapp"]}', State="ENABLED", ) eb.put_targets( Rule="my-rule", EventBusName="my-bus", Targets=[{"Id": "1", "Arn": "arn:aws:lambda:us-east-1:000000000000:function:my-function"}], )
eb.put_rule( Name="my-rule", EventBusName="my-bus", EventPattern='{"source": ["myapp"]}', State="ENABLED", ) eb.put_targets( Rule="my-rule", EventBusName="my-bus", Targets=[{"Id": "1", "Arn": "arn:aws:lambda:us-east-1:000000000000:function:my-function"}], )

Emit an event (triggers Lambda target)

Emit an event (triggers Lambda target)

eb.put_events(Entries=[{ "Source": "myapp", "DetailType": "UserSignup", "Detail": '{"userId": "123"}', "EventBusName": "my-bus", }])
undefined
eb.put_events(Entries=[{ "Source": "myapp", "DetailType": "UserSignup", "Detail": '{"userId": "123"}', "EventBusName": "my-bus", }])
undefined

CloudWatch Logs

CloudWatch 日志

python
import time

logs = aws_client("logs")

logs.create_log_group(logGroupName="/app/service")
logs.create_log_stream(logGroupName="/app/service", logStreamName="stream-1")

logs.put_log_events(
    logGroupName="/app/service",
    logStreamName="stream-1",
    logEvents=[
        {"timestamp": int(time.time() * 1000), "message": "App started"},
        {"timestamp": int(time.time() * 1000), "message": "Request received"},
    ],
)

events = logs.get_log_events(
    logGroupName="/app/service",
    logStreamName="stream-1",
)
for e in events["events"]:
    print(e["message"])
python
import time

logs = aws_client("logs")

logs.create_log_group(logGroupName="/app/service")
logs.create_log_stream(logGroupName="/app/service", logStreamName="stream-1")

logs.put_log_events(
    logGroupName="/app/service",
    logStreamName="stream-1",
    logEvents=[
        {"timestamp": int(time.time() * 1000), "message": "App started"},
        {"timestamp": int(time.time() * 1000), "message": "Request received"},
    ],
)

events = logs.get_log_events(
    logGroupName="/app/service",
    logStreamName="stream-1",
)
for e in events["events"]:
    print(e["message"])

Filter with glob patterns (* and ?), AND terms, -exclusions

Filter with glob patterns (* and ?), AND terms, -exclusions

filtered = logs.filter_log_events( logGroupName="/app/service", filterPattern="Request*", )

---
filtered = logs.filter_log_events( logGroupName="/app/service", filterPattern="Request*", )

---

Testing Patterns

测试示例

pytest fixture (recommended)

pytest fixture(推荐)

python
import pytest
import boto3

MINISTACK_ENDPOINT = "http://localhost:4566"

@pytest.fixture(scope="session")
def aws_endpoint():
    return MINISTACK_ENDPOINT

@pytest.fixture
def s3_client(aws_endpoint):
    return boto3.client(
        "s3",
        endpoint_url=aws_endpoint,
        aws_access_key_id="test",
        aws_secret_access_key="test",
        region_name="us-east-1",
    )

@pytest.fixture
def test_bucket(s3_client):
    bucket = "test-bucket"
    s3_client.create_bucket(Bucket=bucket)
    yield bucket
    # Cleanup
    objs = s3_client.list_objects_v2(Bucket=bucket).get("Contents", [])
    for obj in objs:
        s3_client.delete_object(Bucket=bucket, Key=obj["Key"])
    s3_client.delete_bucket(Bucket=bucket)

def test_upload_download(s3_client, test_bucket):
    s3_client.put_object(Bucket=test_bucket, Key="test.txt", Body=b"hello")
    resp = s3_client.get_object(Bucket=test_bucket, Key="test.txt")
    assert resp["Body"].read() == b"hello"
python
import pytest
import boto3

MINISTACK_ENDPOINT = "http://localhost:4566"

@pytest.fixture(scope="session")
def aws_endpoint():
    return MINISTACK_ENDPOINT

@pytest.fixture
def s3_client(aws_endpoint):
    return boto3.client(
        "s3",
        endpoint_url=aws_endpoint,
        aws_access_key_id="test",
        aws_secret_access_key="test",
        region_name="us-east-1",
    )

@pytest.fixture
def test_bucket(s3_client):
    bucket = "test-bucket"
    s3_client.create_bucket(Bucket=bucket)
    yield bucket
    # Cleanup
    objs = s3_client.list_objects_v2(Bucket=bucket).get("Contents", [])
    for obj in objs:
        s3_client.delete_object(Bucket=bucket, Key=obj["Key"])
    s3_client.delete_bucket(Bucket=bucket)

def test_upload_download(s3_client, test_bucket):
    s3_client.put_object(Bucket=test_bucket, Key="test.txt", Body=b"hello")
    resp = s3_client.get_object(Bucket=test_bucket, Key="test.txt")
    assert resp["Body"].read() == b"hello"

GitHub Actions CI integration

GitHub Actions CI 集成

yaml
undefined
yaml
undefined

.github/workflows/test.yml

.github/workflows/test.yml

name: Test
on: [push, pull_request]
jobs: test: runs-on: ubuntu-latest services: ministack: image: nahuelnucera/ministack ports: - 4566:4566 steps: - uses: actions/checkout@v4 - uses: actions/setup-python@v5 with: python-version: "3.12" - run: pip install -r requirements.txt - run: pytest env: AWS_ACCESS_KEY_ID: test AWS_SECRET_ACCESS_KEY: test AWS_DEFAULT_REGION: us-east-1 AWS_ENDPOINT_URL: http://localhost:4566
undefined
name: Test
on: [push, pull_request]
jobs: test: runs-on: ubuntu-latest services: ministack: image: nahuelnucera/ministack ports: - 4566:4566 steps: - uses: actions/checkout@v4 - uses: actions/setup-python@v5 with: python-version: "3.12" - run: pip install -r requirements.txt - run: pytest env: AWS_ACCESS_KEY_ID: test AWS_SECRET_ACCESS_KEY: test AWS_DEFAULT_REGION: us-east-1 AWS_ENDPOINT_URL: http://localhost:4566
undefined

Using AWS_ENDPOINT_URL env var (boto3 >= 1.28)

使用AWS_ENDPOINT_URL环境变量(boto3 >= 1.28)

python
import os
import boto3
python
import os
import boto3

If AWS_ENDPOINT_URL is set, boto3 uses it automatically — no endpoint_url kwarg needed

If AWS_ENDPOINT_URL is set, boto3 uses it automatically — no endpoint_url kwarg needed

export AWS_ENDPOINT_URL=http://localhost:4566

export AWS_ENDPOINT_URL=http://localhost:4566

s3 = boto3.client("s3") # picks up AWS_ENDPOINT_URL automatically

---
s3 = boto3.client("s3") # picks up AWS_ENDPOINT_URL automatically

---

Supported Services (25+)

支持的服务(25+)

ServiceKey Operations
S3CRUD, multipart, versioning, encryption, lifecycle, CORS, ACL, notifications
SQSStandard & FIFO queues, DLQ, batch ops
SNSTopics, subscriptions, fanout to SQS/Lambda, platform endpoints
DynamoDBTables, CRUD, Query, Scan, TTL, transactions, batch ops
LambdaPython runtimes, invoke, SQS event sources, Function URLs
IAMUsers, roles, policies, groups, instance profiles, OIDC
STSGetCallerIdentity, AssumeRole, GetSessionToken
SecretsManagerFull CRUD, rotation, versioning
SSM Parameter StoreString, SecureString, StringList, path queries
EventBridgeBuses, rules, targets, Lambda dispatch
KinesisStreams, shards, records, iterators
CloudWatch MetricsPutMetricData, alarms, dashboards, CBOR protocol
CloudWatch LogsLog groups/streams, filter with globs, metric filters
SESSend email, templates, configuration sets
Step FunctionsState machine CRUD
RDSSpins up real Postgres/MySQL containers
ElastiCacheSpins up real Redis containers
AthenaReal SQL via DuckDB
ECSReal Docker containers

服务核心功能
S3CRUD、分片上传、版本控制、加密、生命周期、CORS、ACL、通知
SQS标准&FIFO队列、死信队列、批量操作
SNS主题、订阅、扇出到SQS/Lambda、平台端点
DynamoDB表操作、CRUD、查询、扫描、TTL、事务、批量操作
LambdaPython运行时、调用、SQS事件源、函数URL
IAM用户、角色、策略、用户组、实例配置文件、OIDC
STSGetCallerIdentity、AssumeRole、GetSessionToken
SecretsManager完整CRUD、轮换、版本控制
SSM 参数存储String、SecureString、StringList、路径查询
EventBridge事件总线、规则、目标、Lambda触发
Kinesis流、分片、记录、迭代器
CloudWatch 指标PutMetricData、告警、仪表盘、CBOR协议
CloudWatch 日志日志组/流、通配符过滤、指标过滤器
SES邮件发送、模板、配置集
Step Functions状态机CRUD
RDS启动真实Postgres/MySQL容器
ElastiCache启动真实Redis容器
Athena基于DuckDB的真实SQL查询
ECS真实Docker容器

Troubleshooting

故障排查

Connection refused on port 4566
bash
undefined
端口4566连接被拒绝
bash
undefined

Check if ministack is running

Check if ministack is running

Start it

Start it

ministack
ministack

or

or

docker run -p 4566:4566 nahuelnucera/ministack

**`NoCredentialsError` from boto3**
```bash
export AWS_ACCESS_KEY_ID=test
export AWS_SECRET_ACCESS_KEY=test
export AWS_DEFAULT_REGION=us-east-1
docker run -p 4566:4566 nahuelnucera/ministack

**boto3抛出`NoCredentialsError`**
```bash
export AWS_ACCESS_KEY_ID=test
export AWS_SECRET_ACCESS_KEY=test
export AWS_DEFAULT_REGION=us-east-1

Any non-empty values work — MiniStack doesn't validate credentials

Any non-empty values work — MiniStack doesn't validate credentials


**`InvalidSignatureException`**
- This is usually a region mismatch. Ensure `region_name="us-east-1"` matches across all clients.

**Lambda function not found after create**
- MiniStack executes Python runtimes with a warm worker pool. Wait briefly or invoke with `InvocationType="Event"` for async.

**S3 data lost on restart**
```bash

**`InvalidSignatureException`错误**
- 通常是区域不匹配导致,请确保所有客户端的`region_name="us-east-1"`配置一致。

**创建后找不到Lambda函数**
- MiniStack使用热工作池执行Python运行时,请稍等片刻,或使用`InvocationType="Event"`异步调用。

**重启后S3数据丢失**
```bash

Enable persistence

Enable persistence

S3_PERSIST=1 ministack
S3_PERSIST=1 ministack

or in Docker

or in Docker

docker run -p 4566:4566 -e S3_PERSIST=1 -v $(pwd)/data:/data nahuelnucera/ministack

**Port conflict**
```bash
GATEWAY_PORT=5000 ministack
docker run -p 4566:4566 -e S3_PERSIST=1 -v $(pwd)/data:/data nahuelnucera/ministack

**端口冲突**
```bash
GATEWAY_PORT=5000 ministack

Then use http://localhost:5000 as endpoint

Then use http://localhost:5000 as endpoint


**Migrating from LocalStack**
- Replace all `http://localhost:4566` endpoint URLs — they stay the same.
- Remove `LOCALSTACK_AUTH_TOKEN` / `LOCALSTACK_API_KEY` env vars (not needed).
- Replace `localstack/localstack` Docker image with `nahuelnucera/ministack`.
- All `boto3` client code works without modification.

**从LocalStack迁移**
- 保留所有`http://localhost:4566`端点URL无需修改。
- 移除`LOCALSTACK_AUTH_TOKEN`/`LOCALSTACK_API_KEY`环境变量(无需使用)。
- 将`localstack/localstack`Docker镜像替换为`nahuelnucera/ministack`。
- 所有`boto3`客户端代码无需修改即可运行。