Skip to Content
0.8.0 PreviewTxFlowAdvanced Topics

Advanced Topics

Preview — 0.8.0-preview1: APIs may change before stable release.

FlowListener

FlowListener provides callbacks for flow, step, and transaction lifecycle events. Use listeners for logging, metrics, alerting, and audit trails.

Available Callbacks

Flow-Level:

CallbackWhen Called
onFlowStarted(flow)Flow execution begins
onFlowCompleted(flow, result)Flow completes successfully
onFlowFailed(flow, result)Flow fails
onFlowRestarting(flow, attempt, max, reason)Flow restarts after rollback

Step-Level:

CallbackWhen Called
onStepStarted(step, index, total)Step execution begins
onStepCompleted(step, result)Step completes
onStepFailed(step, result)Step fails
onStepRetry(step, attempt, max, error)Before retry
onStepRetryExhausted(step, attempts, error)All retries failed
onStepRebuilding(step, attempt, max, reason)Step rebuilding after rollback

Transaction-Level:

CallbackWhen Called
onTransactionSubmitted(step, txHash)Tx submitted to network
onTransactionInBlock(step, txHash, height)Tx included in block
onTransactionConfirmed(step, txHash)Tx reaches confirmation threshold
onConfirmationDepthChanged(step, txHash, depth, status)Depth updates
onTransactionRolledBack(step, txHash, height)Chain reorg removes tx

Example: Logging Listener

FlowListener loggingListener = new FlowListener() { @Override public void onFlowStarted(TxFlow flow) { log.info("Flow started: {}", flow.getId()); } @Override public void onFlowCompleted(TxFlow flow, FlowResult result) { log.info("Flow completed: {} with {} transactions", flow.getId(), result.getTransactionHashes().size()); } @Override public void onFlowFailed(TxFlow flow, FlowResult result) { log.error("Flow failed: {} - {}", flow.getId(), result.getError() != null ? result.getError().getMessage() : "unknown"); } @Override public void onTransactionSubmitted(FlowStep step, String txHash) { log.info("Step '{}' submitted tx: {}", step.getId(), txHash); } };

Composite Listener

Combine multiple listeners for different concerns. FlowListener.composite() catches exceptions from individual listeners, so one failing listener won’t affect others.

FlowListener combined = FlowListener.composite( metricsListener, auditListener, alertListener ); FlowExecutor executor = FlowExecutor.create(backendService) .withListener(combined);

FlowRegistry

FlowRegistry provides centralized in-memory tracking of all active flows.

FlowRegistry registry = new InMemoryFlowRegistry(); FlowExecutor executor = FlowExecutor.create(backendService) .withRegistry(registry); // Flows auto-register on execute() // Query flows registry.getActiveFlows(); registry.getFlowsByStatus(FlowStatus.COMPLETED); registry.getFlow("escrow-flow"); registry.size(); registry.activeCount();

FlowLifecycleListener

Receive notifications for registry-level events:

registry.addLifecycleListener(new FlowLifecycleListener() { @Override public void onFlowRegistered(String flowId, FlowHandle handle) { log.info("Flow registered: {}", flowId); } @Override public void onFlowCompleted(String flowId, FlowHandle handle, FlowResult result) { log.info("Flow completed: {} status={}", flowId, result.getStatus()); } @Override public void onFlowStatusChanged(String flowId, FlowHandle handle, FlowStatus oldStatus, FlowStatus newStatus) { log.info("Flow {} status: {} -> {}", flowId, oldStatus, newStatus); } });

FlowRegistry is in-memory only and does not survive restarts. Use FlowStateStore for persistence.

State Persistence & Recovery

Implement FlowStateStore to persist flow state to your storage backend (database, Redis, file, etc.) for recovery after application restarts.

FlowStateStore Interface

public interface FlowStateStore { void saveFlowState(FlowStateSnapshot snapshot); List<FlowStateSnapshot> loadPendingFlows(); void updateTransactionState(String flowId, String stepId, String txHash, TransactionStateDetails details); void markFlowComplete(String flowId, FlowStatus status); Optional<FlowStateSnapshot> getFlowState(String flowId); boolean deleteFlow(String flowId); }

The executor persists state on key transitions: flow started, transaction submitted/confirmed/rolled back, flow completed.

Recovery on Startup

Recovery requires reconstructing the TxFlow object and passing the previous FlowResult to executor.resume():

// Load pending flows from store List<FlowStateSnapshot> pending = stateStore.loadPendingFlows(); // Resume each flow for (FlowStateSnapshot snapshot : pending) { TxFlow flow = rebuildFlow(snapshot); // Application-specific reconstruction FlowResult previousResult = toPreviousResult(snapshot); // Convert snapshot to FlowResult FlowHandle handle = executor.resume(flow, previousResult); }

Wiring Together

FlowExecutor executor = FlowExecutor.create(backendService) .withConfirmationConfig(ConfirmationConfig.defaults()) .withStateStore(myStateStore) .withRegistry(new InMemoryFlowRegistry()); // State is persisted automatically during execution FlowHandle handle = executor.execute(flow);

Spring Boot Integration

Configuration Class

@Configuration public class CardanoConfig { @Value("${cardano.backend.url}") private String backendUrl; @Value("${cardano.backend.apiKey}") private String apiKey; @Bean public BackendService backendService() { return new BFBackendService(backendUrl, apiKey); } @Bean public FlowExecutor flowExecutor(BackendService backendService) { return FlowExecutor.create(backendService) .withConfirmationConfig(ConfirmationConfig.defaults()) .withRollbackStrategy(RollbackStrategy.REBUILD_ENTIRE_FLOW); } }

Service Layer

@Service public class TransactionService { private final FlowExecutor flowExecutor; public TransactionService(FlowExecutor flowExecutor) { this.flowExecutor = flowExecutor; } public FlowHandle submitPayment(String from, String to, BigInteger amount, Account signer) { TxFlow flow = TxFlow.builder("payment-" + UUID.randomUUID()) .addStep(FlowStep.builder("send") .withTxContext(builder -> builder .compose(new Tx() .payToAddress(to, Amount.lovelace(amount)) .from(from)) .withSigner(SignerProviders.signerFrom(signer))) .build()) .build(); return flowExecutor.execute(flow); } }

Metrics with Micrometer

@Component public class MetricsFlowListener implements FlowListener { private final Counter flowsStarted; private final Counter flowsCompleted; private final Counter flowsFailed; private final Counter rollbacksDetected; public MetricsFlowListener(MeterRegistry registry) { this.flowsStarted = registry.counter("txflow.flows.started"); this.flowsCompleted = registry.counter("txflow.flows.completed"); this.flowsFailed = registry.counter("txflow.flows.failed"); this.rollbacksDetected = registry.counter("txflow.rollbacks.detected"); } @Override public void onFlowStarted(TxFlow flow) { flowsStarted.increment(); } @Override public void onFlowCompleted(TxFlow flow, FlowResult result) { flowsCompleted.increment(); } @Override public void onFlowFailed(TxFlow flow, FlowResult result) { flowsFailed.increment(); } @Override public void onTransactionRolledBack(FlowStep step, String txHash, long height) { rollbacksDetected.increment(); } }

Virtual Threads (Java 21+)

TxFlow supports virtual threads through the withExecutor(Executor) API. No library changes needed.

FlowExecutor executor = FlowExecutor.create(backendService) .withExecutor(Executors.newVirtualThreadPerTaskExecutor()) .withConfirmationConfig(ConfirmationConfig.defaults()); // Each execute() call runs on its own virtual thread FlowHandle handle1 = executor.execute(flow1); FlowHandle handle2 = executor.execute(flow2); // ... can handle thousands more

Scaling

Concurrent FlowsPlatform ThreadsVirtual Threads
10Works fineWorks fine
100May need tuningWorks fine
1,000Thread pool limitsWorks fine
10,000DifficultWorks fine

Java 11 Alternative

For Java 11 applications needing high concurrency:

ExecutorService executor = new ThreadPoolExecutor( 10, 1000, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy()); FlowExecutor flowExecutor = FlowExecutor.create(backendService) .withExecutor(executor);

TxFlow targets Java 11 for maximum compatibility. The library itself doesn’t use virtual threads, but applications running on Java 21+ can provide a virtual thread executor.

Last updated on