Async Pipelines
Powerful Asynchronous Programming with BoxFuture Pipelines
What Are Async Pipelines?
Async pipelines in BoxLang are powerful chains of asynchronous operations that allow you to compose complex workflows from simple, reusable components. Built on top of BoxFuture (which extends Java's CompletableFuture
), they provide a fluent, functional programming approach to handling asynchronous operations.
Input → Transform → Process → Combine → Result
📥 ⚡️ 🔄 🔗 ✅
Think of pipelines as assembly lines for data processing - each stage performs a specific operation and passes the result to the next stage, all running asynchronously for maximum performance.
💡 Why Use Async Pipelines?
🔄 Non-blocking: Operations don't freeze your application
⚡ Performance: Parallel execution and optimized resource usage
🧩 Composability: Chain operations in a readable, maintainable way
🎯 Error Handling: Centralized exception management and recovery
🔀 Flexibility: Mix synchronous and asynchronous operations seamlessly
📊 Monitoring: Built-in logging and performance tracking
🎯 Entry Points: Creating BoxFutures
BoxLang provides two main entry points for creating async pipelines:
🆕 futureNew()
BIF
futureNew()
BIFThe primary way to create BoxFuture instances with maximum flexibility:
// Create an empty future (to be completed later)
future = futureNew()
// Create a completed future with a value
future = futureNew( "Hello World" )
// Create a future from a function (executed asynchronously)
future = futureNew( () => {
// Your async code here
return performExpensiveOperation()
} )
// Create with a specific executor
future = futureNew(
() => performCPUIntensiveTask(),
"cpu-tasks"
)
⚡ asyncRun()
BIF
asyncRun()
BIFSimplified function for running code asynchronously:
// Run code asynchronously with default executor
future = asyncRun( () => {
return fetchDataFromAPI()
} )
// Run with specific executor
future = asyncRun(
() => processLargeFile(),
"cpu-tasks"
)
🎨 BoxFuture Pipeline Methods
BoxFuture extends CompletableFuture with BoxLang-specific enhancements and fluent methods:
🔄 Transformation Methods
then( function )
Transform result synchronously
BoxFuture
future.then( x => x * 2 )
thenAsync( function )
Transform result asynchronously
BoxFuture
future.thenAsync( x => processAsync(x) )
then( function, executor )
Transform with specific executor
BoxFuture
future.then( x => x + 1, "cpu-tasks" )
thenAsync( function, executor )
Transform async with executor
BoxFuture
future.thenAsync( transform, "io-tasks" )
🚨 Error Handling Methods
onError( function )
Handle exceptions
BoxFuture
future.onError( ex => handleError(ex) )
exceptionally( function )
Handle exceptions (Java style)
BoxFuture
future.exceptionally( ex => defaultValue )
⏰ Timeout Methods
orTimeout( timeout )
Timeout in milliseconds
BoxFuture
future.orTimeout( 5000 )
orTimeout( timeout, unit )
Timeout with time unit
BoxFuture
future.orTimeout( 5, "seconds" )
completeOnTimeout( value, timeout )
Complete with value on timeout (ms)
BoxFuture
future.completeOnTimeout( "default", 3000 )
completeOnTimeout( value, timeout, unit )
Complete with value on timeout
BoxFuture
future.completeOnTimeout( null, 1, "minute" )
📥 Result Retrieval Methods
get()
Get result (blocking)
Object
result = future.get()
get( timeout )
Get result with timeout (ms)
Object
result = future.get( 5000 )
get( timeout, unit )
Get result with timeout
Object
result = future.get( 30, "seconds" )
getOrDefault( defaultValue )
Get result or default
Object
result = future.getOrDefault( "fallback" )
join()
Get result (blocking, unchecked)
Object
result = future.join()
joinOrDefault( defaultValue )
Join with default value
Object
result = future.joinOrDefault( 0 )
getAsAttempt()
Get result as Attempt object
Attempt
attempt = future.getAsAttempt()
getAsAttempt( timeout )
Get as Attempt with timeout (ms)
Attempt
attempt = future.getAsAttempt( 5000 )
getAsAttempt( timeout, unit )
Get as Attempt with timeout
Attempt
attempt = future.getAsAttempt( 1, "minute" )
🌊 Pipeline Examples
Basic Pipeline
// Simple transformation pipeline
result = futureNew( () => fetchUser(123) )
.then( user => user.name )
.then( name => name.toUpperCase() )
.onError( ex => "Unknown User" )
.get()
println( "User: #result#" )
Complex Data Processing Pipeline
// Multi-stage data processing
pipeline = futureNew( () => {
// Fetch data from multiple sources
return [
fetchFromDatabase(),
fetchFromAPI(),
fetchFromCache()
]
} )
.then( dataSources => {
// Combine and normalize data
return dataSources.reduce( (acc, source) => {
return acc.append( source.data )
}, [] )
} )
.thenAsync( data => {
// Process asynchronously with CPU executor
return processLargeDataset( data )
}, "cpu-tasks" )
.then( processedData => {
// Format for output
return {
"results": processedData,
"count": processedData.len(),
"timestamp": now()
}
} )
.onError( ex => {
// Handle any errors in the pipeline
writeLog( text: "Pipeline error: #ex.message#", log: "async" )
return { "error": true, "message": ex.message }
} )
// Execute the pipeline
result = pipeline.get( 30, "seconds" )
Parallel Processing with all()
all()
// Process multiple items in parallel
userIds = [ 1, 2, 3, 4, 5 ]
// Create futures for each user
userFutures = userIds.map( id => {
return futureNew( () => fetchUserProfile( id ) )
} )
// Wait for all to complete
results = BoxFuture.all( userFutures )
.then( profiles => {
// Process all profiles together
return profiles.filter( profile => profile.isActive )
} )
.get()
println( "Active users: #results.len()#" )
Error Recovery Pipeline
// Pipeline with fallback strategies
robustPipeline = futureNew( () => {
// Try primary service
return primaryService.getData()
} )
.onError( ex => {
// Try secondary service
writeLog( text: "Primary failed, trying secondary", log: "async" )
return secondaryService.getData()
} )
.onError( ex => {
// Try cache as last resort
writeLog( text: "Secondary failed, using cache", log: "async" )
return cache.get( "fallback-data" )
} )
.then( data => {
// Process successfully retrieved data
return processData( data )
} )
.orTimeout( 10, "seconds" )
.onError( ex => {
// Ultimate fallback
return { "error": true, "message": "All services unavailable" }
} )
result = robustPipeline.get()
🔗 Combining Futures
Sequential Chaining
// Each step depends on the previous
chain = futureNew( () => authenticateUser() )
.then( token => fetchUserProfile( token ) )
.then( profile => loadUserPreferences( profile.id ) )
.then( preferences => customizeExperience( preferences ) )
.get()
Parallel Execution
// Run multiple operations in parallel
parallel = BoxFuture.all(
futureNew( () => loadUserData() ),
futureNew( () => loadSystemConfig() ),
futureNew( () => loadPermissions() )
)
.then( results => {
userData = results[1]
config = results[2]
permissions = results[3]
return initializeApplication( userData, config, permissions )
} )
.get()
🎯 Advanced Patterns
🔄 Retry Pattern
function retryAsync( operation, maxRetries = 3 ) {
return futureNew( () => {
var attempt = 0
while ( attempt < maxRetries ) {
try {
return operation()
} catch ( e ) {
attempt++
if ( attempt >= maxRetries ) throw e
sleep( 1000 * attempt ) // Exponential backoff
}
}
} )
}
// Usage
result = retryAsync( () => unreliableService.call() )
.orTimeout( 30, "seconds" )
.onError( ex => "Service unavailable" )
.get()
🔀 Fan-out/Fan-in Pattern
// Fan-out: Split work across multiple processors
fanOut = futureNew( () => loadLargeDataset() )
.then( data => {
// Split data into chunks
chunks = data.chunk( 100 )
// Process each chunk in parallel
return BoxFuture.all(
chunks.map( chunk => {
return futureNew( () => processChunk( chunk ), "cpu-tasks" )
} )
)
} )
// Fan-in: Combine results
result = fanOut.then( processedChunks => {
// Combine all processed chunks
return processedChunks.reduce( (acc, chunk) => {
return acc.append( chunk )
}, [] )
} ).get()
🚪 Circuit Breaker Pattern
component CircuitBreaker {
property failures = 0
property threshold = 5
property isOpen = false
property lastFailTime = 0
property resetTimeout = 30000 // 30 seconds
function call( operation ) {
// Check if circuit should be closed
if ( isOpen && (getTickCount() - lastFailTime) > resetTimeout ) {
isOpen = false
failures = 0
}
if ( isOpen ) {
return futureNew( () => {
throw( "Circuit breaker is open" )
} )
}
return futureNew( operation )
.onError( ex => {
failures++
lastFailTime = getTickCount()
if ( failures >= threshold ) {
isOpen = true
writeLog( text: "Circuit breaker opened", log: "async" )
}
throw ex
} )
}
}
// Usage
breaker = new CircuitBreaker()
result = breaker.call( () => externalService.getData() )
.onError( ex => "Service temporarily unavailable" )
.get()
🔧 Executor Integration
Using Specific Executors
// CPU-intensive operations
cpuFuture = futureNew( () => {
return performComplexCalculation()
}, "cpu-tasks" )
// I/O operations
ioFuture = futureNew( () => {
return fetchFromDatabase()
}, "io-tasks" )
// Scheduled operations
scheduledFuture = futureNew( () => {
return generateReport()
}, "scheduled-tasks" )
Dynamic Executor Selection
function smartExecutorChoice( operationType ) {
switch ( operationType ) {
case "cpu":
return "cpu-tasks"
case "io":
return "io-tasks"
case "scheduled":
return "scheduled-tasks"
default:
return "io-tasks" // Default
}
}
// Usage
future = futureNew(
() => performOperation(),
smartExecutorChoice( "cpu" )
)
📊 Monitoring and Debugging
Performance Tracking
startTime = getTickCount()
result = futureNew( () => {
return expensiveOperation()
} )
.then( result => {
duration = getTickCount() - startTime
writeLog(
text: "Operation completed in #duration#ms",
log: "async"
)
return result
} )
.onError( ex => {
duration = getTickCount() - startTime
writeLog(
text: "Operation failed after #duration#ms: #ex.message#",
type: "Error",
log: "async"
)
return null
} )
.get()
Debug Pipeline
debugPipeline = futureNew( () => {
writeLog( text: "Starting data fetch", log: "async" )
return fetchData()
} )
.then( data => {
writeLog( text: "Data fetched, processing #data.len()# items", log: "async" )
return processData( data )
} )
.then( processed => {
writeLog( text: "Processing complete, #processed.len()# results", log: "async" )
return formatResults( processed )
} )
.onError( ex => {
writeLog(
text: "Pipeline error at step: #ex.message#",
type: "Error",
log: "async"
)
return { "error": true }
} )
🌟 Static Factory Methods
BoxFuture provides convenient static methods for common scenarios:
Creating Futures
// Completed future with value
completed = BoxFuture.completedFuture( "Hello World" )
// Failed future with exception
failed = BoxFuture.failedFuture( "Something went wrong" )
// Future from value
valueFuture = BoxFuture.ofValue( 42 )
// Future from CompletableFuture
javaFuture = CompletableFuture.completedFuture( "test" )
boxFuture = BoxFuture.ofCompletableFuture( javaFuture )
Delayed Execution
// Execute after delay
delayed = BoxFuture.run(
() => performDelayedTask(),
BoxFuture.delayedExecutor( 5, "seconds" )
)
// Execute with custom executor after delay
customDelayed = BoxFuture.run(
() => performTask(),
BoxFuture.delayedExecutor( 10, "seconds", executorGet( "cpu-tasks" ) )
)
🚨 Error Handling Best Practices
1. Always Handle Errors
// Good - Error handling included
future = futureNew( () => riskyOperation() )
.onError( ex => {
writeLog( text: "Operation failed: #ex.message#", log: "async" )
return defaultValue
} )
// Bad - No error handling
future = futureNew( () => riskyOperation() )
2. Use Timeouts
// Good - Prevents hanging indefinitely
result = futureNew( () => externalCall() )
.orTimeout( 30, "seconds" )
.onError( ex => "Timeout occurred" )
.get()
// Bad - Could hang forever
result = futureNew( () => externalCall() ).get()
3. Graceful Degradation
// Good - Multiple fallback strategies
result = futureNew( () => primaryService() )
.onError( ex => secondaryService() )
.onError( ex => cacheService() )
.onError( ex => "Service unavailable" )
.get()
🎯 Performance Tips
1. Choose the Right Executor
// CPU-intensive: Use cpu-tasks
cpuFuture = futureNew( () => calculatePi(1000000), "cpu-tasks" )
// I/O-intensive: Use io-tasks (default)
ioFuture = futureNew( () => fetchFromAPI() )
// Scheduled: Use scheduled-tasks
scheduledFuture = futureNew( () => cleanupTask(), "scheduled-tasks" )
2. Avoid Blocking Operations
// Good - Non-blocking chaining
result = futureNew( () => fetchData() )
.then( data => processData( data ) )
.then( processed => saveData( processed ) )
// Don't call .get() until you need the result
// Bad - Blocking at each step
data = futureNew( () => fetchData() ).get()
processed = futureNew( () => processData( data ) ).get()
result = futureNew( () => saveData( processed ) ).get()
3. Use Parallel Processing
// Good - Parallel execution
results = BoxFuture.all(
futureNew( () => task1() ),
futureNew( () => task2() ),
futureNew( () => task3() )
).get()
// Bad - Sequential execution
result1 = futureNew( () => task1() ).get()
result2 = futureNew( () => task2() ).get()
result3 = futureNew( () => task3() ).get()
🔍 Common Patterns and Use Cases
API Orchestration
// Orchestrate multiple API calls
apiResults = BoxFuture.all(
futureNew( () => userService.getUser( userId ) ),
futureNew( () => orderService.getOrders( userId ) ),
futureNew( () => preferencesService.getPreferences( userId ) )
)
.then( results => {
return {
"user": results[1],
"orders": results[2],
"preferences": results[3]
}
} )
.orTimeout( 10, "seconds" )
.get()
Data Pipeline Processing
// ETL pipeline
pipeline = futureNew( () => {
// Extract
return extractDataFromSources()
} )
.then( rawData => {
// Transform
return transformData( rawData )
}, "cpu-tasks" )
.then( transformedData => {
// Load
return loadDataToDestination( transformedData )
}, "io-tasks" )
.onError( ex => {
// Handle pipeline failure
writeLog( text: "ETL pipeline failed: #ex.message#", log: "async" )
return { "success": false, "error": ex.message }
} )
Real-time Processing
// Process streaming data
function processStream( dataStream ) {
return dataStream.map( item => {
return futureNew( () => processItem( item ) )
.orTimeout( 5, "seconds" )
.onError( ex => null ) // Skip failed items
} )
}
// Batch process results
results = BoxFuture.all( processStream( streamData ) )
.then( results => results.filter( r => r != null ) )
.get()
🔚 Conclusion
BoxFuture async pipelines provide a powerful, flexible way to handle asynchronous operations in BoxLang. By combining the fluent API with proper error handling, timeouts, and executor selection, you can build robust, high-performance applications that scale effectively.
Key takeaways:
Use
futureNew()
andasyncRun()
as your primary entry pointsChain operations with
then()
andthenAsync()
for readable pipelinesAlways include error handling with
onError()
andexceptionally()
Use timeouts to prevent hanging operations
Choose appropriate executors for different types of work
Leverage parallel processing with
BoxFuture.all()
for performance
Start simple with basic transformations, then gradually incorporate more advanced patterns as your needs grow. The async pipeline approach will help you build more responsive, efficient applications that can handle complex asynchronous workflows with ease.
Last updated
Was this helpful?