Skip to content

Commit c42392f

Browse files
authored
Merge pull request #85 from hyperledger/no-blocking-improvements
improve logic for no wait submission mode
2 parents 46e1c3b + 3bc26d3 commit c42392f

File tree

1 file changed

+22
-9
lines changed

1 file changed

+22
-9
lines changed

internal/perf/perf.go

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,10 @@ func (pr *perfRunner) Start() (err error) {
395395
assumedTokenCountPerSecond := 0
396396
for _, testConf := range pr.cfg.Tests {
397397
assumedTokenCountPerSecond += testConf.ActionsPerLoop * ((1 + testConf.Workers) * testConf.Workers / 2)
398+
if pr.cfg.MaxSubmissionsPerSecond > testConf.Workers {
399+
assumedTokenCountPerSecond = assumedTokenCountPerSecond * ((pr.cfg.MaxSubmissionsPerSecond / testConf.Workers) + 1)
400+
}
401+
// if we over pre-mint it doesn't matter.
398402
}
399403

400404
assumedTotalTokenRequired := (assumedTokenCountPerSecond * int(pr.cfg.Length.Seconds()) * 120 /*allow 20% extra*/) / 100
@@ -478,7 +482,7 @@ func (pr *perfRunner) Start() (err error) {
478482
}
479483
}
480484

481-
if prepEventTrackingID != "" && !pr.cfg.NoWaitSubmission && !pr.cfg.SkipMintConfirmations {
485+
if prepEventTrackingID != "" {
482486
log.Infof("Waiting for tracking %s", prepEventTrackingID)
483487
<-pr.wsReceivers[prepEventTrackingID]
484488
log.Infof("Prep action completed")
@@ -587,23 +591,30 @@ perfLoop:
587591

588592
pr.stopping = true
589593

590-
tallyStart := time.Now()
594+
idleStart := time.Now()
591595

592596
if pr.cfg.NoWaitSubmission {
593597
eventsCount := getMetricVal(receivedEventsCounter)
594598
submissionCount := getMetricVal(totalActionsCounter)
595-
log.Infof("<No wait submission mode> Wait for the event count %f to reach request sent count %f, within 30s", eventsCount, submissionCount)
599+
previousEventCount := eventsCount
600+
log.Infof("<No wait submission mode> Wait for the event count %f to reach request sent count %f", eventsCount, submissionCount)
596601
for {
602+
previousEventCount = eventsCount
603+
eventsCount = getMetricVal(receivedEventsCounter)
604+
if previousEventCount < eventsCount {
605+
// reset idle start time if there are new events
606+
idleStart = time.Now()
607+
}
597608
if eventsCount == submissionCount {
598609
break
599610
} else if eventsCount > submissionCount {
600611
log.Warnf("The number of events received %f is greater than the number of requests sent %f.", eventsCount, submissionCount)
601612
break
602613
}
603614

604-
// Check if more than 1 minute has passed
605-
if time.Since(tallyStart) > 30*time.Second {
606-
log.Errorf("The number of events received %f doesn't tally up to the number of requests sent %f after %s.", eventsCount, submissionCount, time.Since(time.Unix(pr.startTime, 0)))
615+
// Check if more than 30 seconds has passed
616+
if time.Since(idleStart) > 30*time.Second {
617+
log.Errorf("The number of events received %f doesn't tally up to the number of requests sent %f after 30s idle time, total tally time: %s.", eventsCount, submissionCount, time.Since(time.Unix(pr.startTime, 0)))
607618
break
608619
}
609620

@@ -655,7 +666,9 @@ perfLoop:
655666
log.Infof(" - Measured send TPS: %2f", tps.SendRate)
656667
log.Infof(" - Measured throughput: %2f", tps.Throughput)
657668
log.Infof(" - Measured send duration: %s", pr.sendTime)
658-
log.Infof(" - Measured event receiving duration: %s", pr.receiveTime)
669+
if !pr.cfg.NoWaitSubmission {
670+
log.Infof(" - Measured event receiving duration: %s", pr.receiveTime)
671+
}
659672
log.Infof(" - Measured total duration: %s", pr.totalTime)
660673

661674
return nil
@@ -814,7 +827,7 @@ func (pr *perfRunner) batchEventLoop(nodeURL string, wsconn wsclient.WSClient) (
814827

815828
pr.recordCompletedAction()
816829
// Release worker so it can continue to its next task
817-
if !pr.stopping && !pr.cfg.NoWaitSubmission && !pr.cfg.SkipMintConfirmations {
830+
if (!pr.stopping && !pr.cfg.NoWaitSubmission && !pr.cfg.SkipMintConfirmations) || pr.eventPrefixForCurrentStage == preparePrefix {
818831
if workerID >= 0 {
819832
preFixedWorkerID := fmt.Sprintf("%s%d", pr.eventPrefixForCurrentStage, workerID)
820833
// No need for locking as channel have built in support
@@ -902,7 +915,7 @@ func (pr *perfRunner) eventLoop(nodeURL string, wsconn wsclient.WSClient) (err e
902915
wsconn.Send(context.Background(), ackJSON)
903916
pr.recordCompletedAction()
904917
// Release worker so it can continue to its next task
905-
if !pr.stopping && !pr.cfg.NoWaitSubmission && !pr.cfg.SkipMintConfirmations {
918+
if (!pr.stopping && !pr.cfg.NoWaitSubmission && !pr.cfg.SkipMintConfirmations) || pr.eventPrefixForCurrentStage == preparePrefix {
906919
if workerID >= 0 {
907920
preFixedWorkerID := fmt.Sprintf("%s%d", pr.eventPrefixForCurrentStage, workerID)
908921
pr.wsReceivers[preFixedWorkerID] <- nodeURL

0 commit comments

Comments
 (0)