Advanced Topics
Preview — 0.8.0-preview1: APIs may change before stable release.
FlowListener
FlowListener provides callbacks for flow, step, and transaction lifecycle events. Use listeners for logging, metrics, alerting, and audit trails.
Available Callbacks
Flow-Level:
| Callback | When Called |
|---|---|
onFlowStarted(flow) | Flow execution begins |
onFlowCompleted(flow, result) | Flow completes successfully |
onFlowFailed(flow, result) | Flow fails |
onFlowRestarting(flow, attempt, max, reason) | Flow restarts after rollback |
Step-Level:
| Callback | When Called |
|---|---|
onStepStarted(step, index, total) | Step execution begins |
onStepCompleted(step, result) | Step completes |
onStepFailed(step, result) | Step fails |
onStepRetry(step, attempt, max, error) | Before retry |
onStepRetryExhausted(step, attempts, error) | All retries failed |
onStepRebuilding(step, attempt, max, reason) | Step rebuilding after rollback |
Transaction-Level:
| Callback | When Called |
|---|---|
onTransactionSubmitted(step, txHash) | Tx submitted to network |
onTransactionInBlock(step, txHash, height) | Tx included in block |
onTransactionConfirmed(step, txHash) | Tx reaches confirmation threshold |
onConfirmationDepthChanged(step, txHash, depth, status) | Depth updates |
onTransactionRolledBack(step, txHash, height) | Chain reorg removes tx |
Example: Logging Listener
FlowListener loggingListener = new FlowListener() {
@Override
public void onFlowStarted(TxFlow flow) {
log.info("Flow started: {}", flow.getId());
}
@Override
public void onFlowCompleted(TxFlow flow, FlowResult result) {
log.info("Flow completed: {} with {} transactions",
flow.getId(), result.getTransactionHashes().size());
}
@Override
public void onFlowFailed(TxFlow flow, FlowResult result) {
log.error("Flow failed: {} - {}",
flow.getId(),
result.getError() != null ? result.getError().getMessage() : "unknown");
}
@Override
public void onTransactionSubmitted(FlowStep step, String txHash) {
log.info("Step '{}' submitted tx: {}", step.getId(), txHash);
}
};Composite Listener
Combine multiple listeners for different concerns. FlowListener.composite() catches exceptions from individual
listeners, so one failing listener won’t affect others.
FlowListener combined = FlowListener.composite(
metricsListener,
auditListener,
alertListener
);
FlowExecutor executor = FlowExecutor.create(backendService)
.withListener(combined);FlowRegistry
FlowRegistry provides centralized in-memory tracking of all active flows.
FlowRegistry registry = new InMemoryFlowRegistry();
FlowExecutor executor = FlowExecutor.create(backendService)
.withRegistry(registry); // Flows auto-register on execute()
// Query flows
registry.getActiveFlows();
registry.getFlowsByStatus(FlowStatus.COMPLETED);
registry.getFlow("escrow-flow");
registry.size();
registry.activeCount();FlowLifecycleListener
Receive notifications for registry-level events:
registry.addLifecycleListener(new FlowLifecycleListener() {
@Override
public void onFlowRegistered(String flowId, FlowHandle handle) {
log.info("Flow registered: {}", flowId);
}
@Override
public void onFlowCompleted(String flowId, FlowHandle handle, FlowResult result) {
log.info("Flow completed: {} status={}", flowId, result.getStatus());
}
@Override
public void onFlowStatusChanged(String flowId, FlowHandle handle,
FlowStatus oldStatus, FlowStatus newStatus) {
log.info("Flow {} status: {} -> {}", flowId, oldStatus, newStatus);
}
});FlowRegistry is in-memory only and does not survive restarts. Use FlowStateStore for persistence.
State Persistence & Recovery
Implement FlowStateStore to persist flow state to your storage backend (database, Redis, file, etc.) for
recovery after application restarts.
FlowStateStore Interface
public interface FlowStateStore {
void saveFlowState(FlowStateSnapshot snapshot);
List<FlowStateSnapshot> loadPendingFlows();
void updateTransactionState(String flowId, String stepId,
String txHash, TransactionStateDetails details);
void markFlowComplete(String flowId, FlowStatus status);
Optional<FlowStateSnapshot> getFlowState(String flowId);
boolean deleteFlow(String flowId);
}The executor persists state on key transitions: flow started, transaction submitted/confirmed/rolled back, flow completed.
Recovery on Startup
Recovery requires reconstructing the TxFlow object and passing the previous FlowResult to executor.resume():
// Load pending flows from store
List<FlowStateSnapshot> pending = stateStore.loadPendingFlows();
// Resume each flow
for (FlowStateSnapshot snapshot : pending) {
TxFlow flow = rebuildFlow(snapshot); // Application-specific reconstruction
FlowResult previousResult = toPreviousResult(snapshot); // Convert snapshot to FlowResult
FlowHandle handle = executor.resume(flow, previousResult);
}Wiring Together
FlowExecutor executor = FlowExecutor.create(backendService)
.withConfirmationConfig(ConfirmationConfig.defaults())
.withStateStore(myStateStore)
.withRegistry(new InMemoryFlowRegistry());
// State is persisted automatically during execution
FlowHandle handle = executor.execute(flow);Spring Boot Integration
Configuration Class
@Configuration
public class CardanoConfig {
@Value("${cardano.backend.url}")
private String backendUrl;
@Value("${cardano.backend.apiKey}")
private String apiKey;
@Bean
public BackendService backendService() {
return new BFBackendService(backendUrl, apiKey);
}
@Bean
public FlowExecutor flowExecutor(BackendService backendService) {
return FlowExecutor.create(backendService)
.withConfirmationConfig(ConfirmationConfig.defaults())
.withRollbackStrategy(RollbackStrategy.REBUILD_ENTIRE_FLOW);
}
}Service Layer
@Service
public class TransactionService {
private final FlowExecutor flowExecutor;
public TransactionService(FlowExecutor flowExecutor) {
this.flowExecutor = flowExecutor;
}
public FlowHandle submitPayment(String from, String to,
BigInteger amount, Account signer) {
TxFlow flow = TxFlow.builder("payment-" + UUID.randomUUID())
.addStep(FlowStep.builder("send")
.withTxContext(builder -> builder
.compose(new Tx()
.payToAddress(to, Amount.lovelace(amount))
.from(from))
.withSigner(SignerProviders.signerFrom(signer)))
.build())
.build();
return flowExecutor.execute(flow);
}
}Metrics with Micrometer
@Component
public class MetricsFlowListener implements FlowListener {
private final Counter flowsStarted;
private final Counter flowsCompleted;
private final Counter flowsFailed;
private final Counter rollbacksDetected;
public MetricsFlowListener(MeterRegistry registry) {
this.flowsStarted = registry.counter("txflow.flows.started");
this.flowsCompleted = registry.counter("txflow.flows.completed");
this.flowsFailed = registry.counter("txflow.flows.failed");
this.rollbacksDetected = registry.counter("txflow.rollbacks.detected");
}
@Override
public void onFlowStarted(TxFlow flow) { flowsStarted.increment(); }
@Override
public void onFlowCompleted(TxFlow flow, FlowResult result) { flowsCompleted.increment(); }
@Override
public void onFlowFailed(TxFlow flow, FlowResult result) { flowsFailed.increment(); }
@Override
public void onTransactionRolledBack(FlowStep step, String txHash, long height) {
rollbacksDetected.increment();
}
}Virtual Threads (Java 21+)
TxFlow supports virtual threads through the withExecutor(Executor) API. No library changes needed.
FlowExecutor executor = FlowExecutor.create(backendService)
.withExecutor(Executors.newVirtualThreadPerTaskExecutor())
.withConfirmationConfig(ConfirmationConfig.defaults());
// Each execute() call runs on its own virtual thread
FlowHandle handle1 = executor.execute(flow1);
FlowHandle handle2 = executor.execute(flow2);
// ... can handle thousands moreScaling
| Concurrent Flows | Platform Threads | Virtual Threads |
|---|---|---|
| 10 | Works fine | Works fine |
| 100 | May need tuning | Works fine |
| 1,000 | Thread pool limits | Works fine |
| 10,000 | Difficult | Works fine |
Java 11 Alternative
For Java 11 applications needing high concurrency:
ExecutorService executor = new ThreadPoolExecutor(
10, 1000, 60L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new ThreadPoolExecutor.CallerRunsPolicy());
FlowExecutor flowExecutor = FlowExecutor.create(backendService)
.withExecutor(executor);TxFlow targets Java 11 for maximum compatibility. The library itself doesn’t use virtual threads, but applications running on Java 21+ can provide a virtual thread executor.