Loading...
Loading...
Rust coding guidelines for the Windmill backend. MUST use when writing or modifying Rust code in the backend directory.
npx skill4agent add windmill-labs/windmill rust-backendbackend/structenumnewtypeenumNonZeroU32Duration&strStringArc<T>Cow<'a, T>// State machine with enum
enum JobState {
Pending { scheduled_for: DateTime<Utc> },
Running { started_at: DateTime<Utc>, worker: String },
Completed { result: JobResult, duration_ms: i64 },
Failed { error: String, retries: u32 },
}
// Avoid multiple booleans
struct Job {
is_pending: bool, // Don't do this
is_running: bool,
is_completed: bool,
}implstruct JobQueue {
jobs: Vec<Job>,
capacity: usize,
}
impl JobQueue {
// Constructors first
pub fn new(capacity: usize) -> Self { ... }
pub fn with_jobs(jobs: Vec<Job>) -> Self { ... }
// Getters
pub fn len(&self) -> usize { ... }
pub fn is_empty(&self) -> bool { ... }
// Mutation methods
pub fn push(&mut self, job: Job) -> Result<()> { ... }
pub fn pop(&mut self) -> Option<Job> { ... }
// Domain logic
pub fn next_scheduled(&self) -> Option<&Job> { ... }
}.filter().map().collect()// Preferred
let results: Vec<_> = items
.iter()
.filter(|item| item.is_valid())
.map(|item| item.transform())
.collect();
// Avoid
let mut results = Vec::new();
for item in items.iter() {
if item.is_valid() {
results.push(item.transform());
}
}Errorwindmill_common::errorResult<T, Error>JsonResult<T>use windmill_common::error::{Error, Result};
// Use ? operator for propagation
pub async fn get_job(db: &DB, id: Uuid) -> Result<Job> {
let job = sqlx::query_as!(Job, "SELECT ... WHERE id = $1", id)
.fetch_optional(db)
.await?
.ok_or_else(|| Error::NotFound("job not found".to_string()))?;
Ok(job)
}if letlet...elselet Some(config) = get_config() else {
return Err(Error::MissingConfig);
};.unwrap()// Preferred - early returns
fn process_job(job: Option<Job>) -> Result<Output> {
let Some(job) = job else {
return Ok(Output::default());
};
if !job.is_valid() {
return Err(Error::InvalidJob);
}
if job.is_cached() {
return Ok(job.cached_result());
}
// Main logic at the end, not nested
execute_job(job)
}
// Avoid - deep nesting
fn process_job(job: Option<Job>) -> Result<Output> {
if let Some(job) = job {
if job.is_valid() {
if !job.is_cached() {
execute_job(job)
} else {
Ok(job.cached_result())
}
} else {
Err(Error::InvalidJob)
}
} else {
Ok(Output::default())
}
}// Preferred
let data = fetch_raw_data();
let data = parse(data);
let data = validate(data)?;
// Avoid
let raw_data = fetch_raw_data();
let parsed_data = parse(raw_data);
let validated_data = validate(parsed_data)?;///// Preferred
enum JobStatus {
Pending,
Running,
Completed,
}
// Avoid
struct Job {
is_running: bool,
is_completed: bool,
}// Explicit matching preferred
match status {
JobStatus::Pending => handle_pending(),
JobStatus::Running => handle_running(),
JobStatus::Completed => handle_completed(),
}
// Wildcards OK for fallback
match result {
Ok(value) => process(value),
Err(_) => return default_value(),
}
// Wildcards OK for ignoring fields in destructuring
let Point { x, y, .. } = point;// Preferred
async fn process_job(
Extension(db): Extension<DB>,
Path((workspace, job_id)): Path<(String, Uuid)>,
Query(pagination): Query<Pagination>,
) -> Result<Json<Job>> {
// ...
}
// Avoid
async fn process_job(
db_ext: Extension<DB>,
path: Path<(String, Uuid)>,
query: Query<Pagination>,
) -> Result<Json<Job>> {
let Extension(db) = db_ext;
let Path((workspace, job_id)) = path;
// ...
}// Implement From/Into for type conversions
impl From<DbJob> for ApiJob {
fn from(db: DbJob) -> Self {
ApiJob {
id: db.id,
status: db.status.into(),
}
}
}
// Use TryFrom for fallible conversions
impl TryFrom<String> for JobKind {
type Error = Error;
fn try_from(s: String) -> Result<Self, Self::Error> { ... }
}derive#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Job { ... }pub(crate)pub// Prefer restricted visibility
pub(crate) fn internal_helper() { ... }
// Only pub for external API
pub fn create_job(...) -> Result<Job> { ... }Box<serde_json::value::RawValue>serde_json::Value// Preferred - avoids parsing/serialization overhead
pub struct Job {
pub id: Uuid,
pub args: Option<Box<serde_json::value::RawValue>>,
}
// Only use Value when you need to inspect/modify JSON
let value: serde_json::Value = serde_json::from_str(&json)?;
if let Some(field) = value.get("field") {
// modify or inspect
}#[derive(Serialize, Deserialize)]
pub struct Job {
#[serde(rename = "jobId")]
pub id: Uuid,
#[serde(default)]
pub priority: i32,
#[serde(skip_serializing_if = "Option::is_none")]
pub parent_job: Option<Uuid>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub tags: Vec<String>,
}#[derive(Deserialize)]
pub struct JobInput<'a> {
#[serde(borrow)]
pub workspace_id: Cow<'a, str>,
#[serde(borrow)]
pub script_path: &'a str,
}SELECT *// Preferred - explicit columns
sqlx::query_as!(
Job,
"SELECT id, workspace_id, path, created_at FROM v2_job WHERE id = $1",
job_id
)
// Avoid - breaks when columns are added
sqlx::query_as!(Job, "SELECT * FROM v2_job WHERE id = $1", job_id)// Preferred - single query with multiple values
sqlx::query!(
"INSERT INTO job_logs (job_id, logs) VALUES ($1, $2), ($3, $4)",
id1, log1, id2, log2
)
// Avoid N+1 queries
for id in ids {
sqlx::query!("SELECT ... WHERE id = $1", id).fetch_one(db).await?;
}
// Preferred - single query with IN clause
sqlx::query!("SELECT ... WHERE id = ANY($1)", &ids[..]).fetch_all(db).await?spawn_blocking// Preferred - offload blocking work
let result = tokio::task::spawn_blocking(move || {
expensive_computation(&data)
}).await?;
// Avoid - blocks the runtime
let result = expensive_computation(&data); // Don't do this in asyncuse tokio::sync::mpsc;
use tokio::time::sleep;
// Avoid in async contexts
use std::thread::sleep; // Blocks the runtime// Preferred - bounded channel prevents overwhelming
let (tx, rx) = tokio::sync::mpsc::channel(100);
// Be careful with unbounded
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();std::sync::Mutexparking_lot::Mutextokio::sync::Mutex.await// Preferred for data protection - std mutex is faster
use std::sync::Mutex;
struct Cache {
data: Mutex<HashMap<String, Value>>,
}
impl Cache {
fn get(&self, key: &str) -> Option<Value> {
self.data.lock().unwrap().get(key).cloned()
}
fn insert(&self, key: String, value: Value) {
self.data.lock().unwrap().insert(key, value);
}
}tokio::sync::Mutex.awaituse tokio::sync::Mutex;
use std::sync::Arc;
// Async mutex for IO resources held across await points
let conn = Arc::new(Mutex::new(db_connection));
async fn execute_query(conn: Arc<Mutex<DbConn>>, query: &str) {
let mut lock = conn.lock().await;
lock.execute(query).await; // Lock held across .await
}Arc<Mutex<...>>struct SharedState {
inner: std::sync::Mutex<StateInner>,
}
impl SharedState {
fn update(&self, value: i32) {
self.inner.lock().unwrap().value = value;
}
fn get(&self) -> i32 {
self.inner.lock().unwrap().value
}
}let (tx, mut rx) = tokio::sync::mpsc::channel(32);
tokio::spawn(async move {
while let Some(cmd) = rx.recv().await {
handle_io_command(&mut resource, cmd).await;
}
});cargo checkcargo build