devops

Optimism Batcher 코드 까보기 본문

Layer2

Optimism Batcher 코드 까보기

vata500 2023. 8. 26. 22:14
반응형

Optimism의 op-batcher는 Layer1에 Tx Batch를 생성하여 압축, 전송하는 역할을 담당한다. 코드를 통해서 간단히 알아보자.

# op-batcher 

func main() {
	oplog.SetupDefaults()

	app := cli.NewApp()                                                           
	app.Flags = flags.Flags                                                       
	app.Version = fmt.Sprintf("%s-%s-%s", Version, GitCommit, GitDate)            
	app.Name = "op-batcher"                                                     
	app.Usage = "Batch Submitter Service"                                       
	app.Description = "Service for generating and submitting L2 tx batches to L1" 
	app.Action = curryMain(Version)                                            
	app.Commands = []*cli.Command{                                              
		{
			Name:        "doc",
			Subcommands: doc.Subcommands,
		},
	}

	err := app.Run(os.Args)
	if err != nil {
		log.Crit("Application failed", "message", err)
	}
}

op-batcher 실행의 메인 이벤트 함수다. batcher의 버전과 Flag 설정부터, app 이름과 용도, 설명 등이 선언된다.

func curryMain(version string) func(ctx *cli.Context) error {
	return func(ctx *cli.Context) error {
		return batcher.Main(version, ctx)
	}
}

curryMain에 버전을 전달하여 batcher의 batcherSubmitter의 EntryPoint 함수인 Main()을 호출한다.

# batch_submitter.go

func Main(version string, cliCtx *cli.Context) error {
	if err := flags.CheckRequired(cliCtx); err != nil {
		return err
	}
	cfg := NewConfig(cliCtx)
	if err := cfg.Check(); err != nil {
		return fmt.Errorf("invalid CLI flags: %w", err)
	}

	l := oplog.NewLogger(cfg.LogConfig)
	opservice.ValidateEnvVars(flags.EnvVarPrefix, flags.Flags, l)
	m := metrics.NewMetrics("default")
	l.Info("Initializing Batch Submitter")

	batchSubmitter, err := NewBatchSubmitterFromCLIConfig(cfg, l, m)
	if err != nil {
		l.Error("Unable to create Batch Submitter", "error", err)
		return err
	}

	if !cfg.Stopped {
		if err := batchSubmitter.Start(); err != nil {
			l.Error("Unable to start Batch Submitter", "error", err)
			return err
		}
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel() // Stop pprof and metrics only after main loop returns
	defer batchSubmitter.StopIfRunning(context.Background())

	pprofConfig := cfg.PprofConfig
	if pprofConfig.Enabled {
		l.Info("starting pprof", "addr", pprofConfig.ListenAddr, "port", pprofConfig.ListenPort)
		go func() {
			if err := oppprof.ListenAndServe(ctx, pprofConfig.ListenAddr, pprofConfig.ListenPort); err != nil {
				l.Error("error starting pprof", "err", err)
			}
		}()
	}

	metricsCfg := cfg.MetricsConfig
	if metricsCfg.Enabled {
		l.Info("starting metrics server", "addr", metricsCfg.ListenAddr, "port", metricsCfg.ListenPort)
		go func() {
			if err := m.Serve(ctx, metricsCfg.ListenAddr, metricsCfg.ListenPort); err != nil {
				l.Error("error starting metrics server", "err", err)
			}
		}()
		m.StartBalanceMetrics(ctx, l, batchSubmitter.L1Client, batchSubmitter.TxManager.From())
	}

	rpcCfg := cfg.RPCConfig
	server := oprpc.NewServer(
		rpcCfg.ListenAddr,
		rpcCfg.ListenPort,
		version,
		oprpc.WithLogger(l),
	)
	if rpcCfg.EnableAdmin {
		server.AddAPI(gethrpc.API{
			Namespace: "admin",
			Service:   rpc.NewAdminAPI(batchSubmitter),
		})
		l.Info("Admin RPC enabled")
	}
	if err := server.Start(); err != nil {
		cancel()
		return fmt.Errorf("error starting RPC server: %w", err)
	}

	m.RecordInfo(version)
	m.RecordUp()

	opio.BlockOnInterrupts()
	if err := server.Stop(); err != nil {
		l.Error("Error shutting down http server: %w", err)
	}
	return nil
}

코드를 보면 필수적인 flag를 체크하고 batcher config를 설정한 다음, 로깅객체와 메트릭 객체를 생성하는 기본적인 단계가 진행된다.

	batchSubmitter, err := NewBatchSubmitterFromCLIConfig(cfg, l, m)
	if err != nil {
		l.Error("Unable to create Batch Submitter", "error", err)
		return err
	}

	if !cfg.Stopped {
		if err := batchSubmitter.Start(); err != nil {
			l.Error("Unable to start Batch Submitter", "error", err)
			return err
		}
	}

NewBatchSubmitterFromCLIConfig로 batchSubmitter 객체를 생성한다. 해당 함수는 driver.go에 위치한 것으로, 아래와 같은 config 값으로 객체를 생성한다.

	batcherCfg := Config{
		L1Client:               l1Client,
		L2Client:               l2Client,
		RollupNode:             rollupClient,
		PollInterval:           cfg.PollInterval,
		MaxPendingTransactions: cfg.MaxPendingTransactions,
		NetworkTimeout:         cfg.TxMgrConfig.NetworkTimeout,
		TxManager:              txManager,
		Rollup:                 rcfg,
		Channel: ChannelConfig{
			SeqWindowSize:      rcfg.SeqWindowSize,
			ChannelTimeout:     rcfg.ChannelTimeout,
			MaxChannelDuration: cfg.MaxChannelDuration,
			SubSafetyMargin:    cfg.SubSafetyMargin,
			MaxFrameSize:       cfg.MaxL1TxSize - 1, // subtract 1 byte for version
			CompressorConfig:   cfg.CompressorConfig.Config(),
		},
	}

 

보면 알겠지만, 기본적으로 L1Client와 L2Client(op-geth), RollupNode(op-node) 그리고 Batch 전송 주기인 PollInterval 등이 flag와 환경변수로 Config 설정된다.

위에서 생성된 batchSubmitter는 driver.go의 batcher의 본격적인 실행함수인 Start()가 호출된다. Start()에서는 간단히 설명하자면 mutex로 함수가 종료된 후에서야 잠금해제되는 설정과 함께 loop() 함수를 고루틴으로 실행한다.

func (l *BatchSubmitter) loop() {
	defer l.wg.Done()

	ticker := time.NewTicker(l.PollInterval)
	defer ticker.Stop()

	receiptsCh := make(chan txmgr.TxReceipt[txData])
	queue := txmgr.NewQueue[txData](l.killCtx, l.txMgr, l.MaxPendingTransactions)

	for {
		select {
		case <-ticker.C:
			if err := l.loadBlocksIntoState(l.shutdownCtx); errors.Is(err, ErrReorg) {
				err := l.state.Close()
				if err != nil {
					l.log.Error("error closing the channel manager to handle a L2 reorg", "err", err)
				}
				l.publishStateToL1(queue, receiptsCh, true)
				l.state.Clear()
				continue
			}
			l.publishStateToL1(queue, receiptsCh, false)
		case r := <-receiptsCh:
			l.handleReceipt(r)
		case <-l.shutdownCtx.Done():
			err := l.state.Close()
			if err != nil {
				l.log.Error("error closing the channel manager", "err", err)
			}
			l.publishStateToL1(queue, receiptsCh, true)
			return
		}
	}
}

loop()의 주요 객체로는 다음과 같다.

  • ticker : PollInterval의 시간 간격으로 이벤트를 보내는 객체
  • receiptCh : 트랜재션 receipt를 수신할 채널
  • queue : 트랜잭션이 쌓일 대기열

이 loop()에서는 ticker를 통해 시간간격으로 이벤트가 실행되어 주기적으로 l.loadBlocksIntoState를 호출하여 상태를 업데이트한다. 만약 reorg나 에러가 발생하면 채널을 닫고 상태를 초기화한다.

publishStateToL1를 통해서 한 트랜잭션을 L1으로 제출하는 publishTxToL1()를 비동기적으로 호출한다. 

// publishTxToL1 submits a single state tx to the L1
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) error {
	// send all available transactions
	l1tip, err := l.l1Tip(ctx)
	if err != nil {
		l.log.Error("Failed to query L1 tip", "error", err)
		return err
	}
	l.recordL1Tip(l1tip)

	// Collect next transaction data
	txdata, err := l.state.TxData(l1tip.ID())
	if err == io.EOF {
		l.log.Trace("no transaction data available")
		return err
	} else if err != nil {
		l.log.Error("unable to get tx data", "err", err)
		return err
	}

	l.sendTransaction(txdata, queue, receiptsCh)
	return nil
}

publishTxToL1()은 트랜잭션을 L1에 제출하기 전에 l1tip이라는 L1의 마지막 블록 정보를 받아온다. l1tip()은 L1의 마지막 블록 헤더를 받아와 해당 블록 정보를 리턴해준다.

l1tip으로 L1의 마지막 블록 정보를 l.recordL1Tip()으로 batchSubmitter에 기록한다. 그리곤 l.state.TxData()를 호출하여 l1tip.ID()를 통해 L1에 제출되어야할 트랜잭션 데이터(txdata)를 받아온다. 

그리고 마지막 sendTransaction()를 호출하여 Tx를 전송한다. 이 sendTransaction에서 가스비를 계산하고 To, Txdata, Gaslimit이 포함된 트랜잭션 매니저의 candidate가 생성되어 전송된다.

반응형
Comments