chore: cleanup patch code

This commit is contained in:
wwqgtxx
2024-08-27 11:04:42 +08:00
parent 6e04e1e9dc
commit 3e2c9ce821
9 changed files with 83 additions and 74 deletions

View File

@@ -27,6 +27,8 @@ type extraOption struct {
}
type HealthCheck struct {
ctx context.Context
ctxCancel context.CancelFunc
url string
extra map[string]*extraOption
mu sync.Mutex
@@ -36,7 +38,6 @@ type HealthCheck struct {
lazy bool
expectedStatus utils.IntRanges[uint16]
lastTouch atomic.TypedValue[time.Time]
done chan struct{}
singleDo *singledo.Single[struct{}]
timeout time.Duration
}
@@ -59,7 +60,7 @@ func (hc *HealthCheck) process() {
} else {
log.Debugln("Skip once health check because we are lazy")
}
case <-hc.done:
case <-hc.ctx.Done():
ticker.Stop()
hc.stop()
return
@@ -146,7 +147,7 @@ func (hc *HealthCheck) check() {
_, _, _ = hc.singleDo.Do(func() (struct{}, error) {
id := utils.NewUUIDV4().String()
log.Debugln("Start New Health Checking {%s}", id)
b, _ := batch.New[bool](context.Background(), batch.WithConcurrencyNum[bool](10))
b, _ := batch.New[bool](hc.ctx, batch.WithConcurrencyNum[bool](10))
// execute default health check
option := &extraOption{filters: nil, expectedStatus: hc.expectedStatus}
@@ -195,7 +196,7 @@ func (hc *HealthCheck) execute(b *batch.Batch[bool], url, uid string, option *ex
p := proxy
b.Go(p.Name(), func() (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), hc.timeout)
ctx, cancel := context.WithTimeout(hc.ctx, hc.timeout)
defer cancel()
log.Debugln("Health Checking, proxy: %s, url: %s, id: {%s}", p.Name(), url, uid)
_, _ = p.URLTest(ctx, url, expectedStatus)
@@ -206,7 +207,7 @@ func (hc *HealthCheck) execute(b *batch.Batch[bool], url, uid string, option *ex
}
func (hc *HealthCheck) close() {
hc.done <- struct{}{}
hc.ctxCancel()
}
func NewHealthCheck(proxies []C.Proxy, url string, timeout uint, interval uint, lazy bool, expectedStatus utils.IntRanges[uint16]) *HealthCheck {
@@ -217,8 +218,11 @@ func NewHealthCheck(proxies []C.Proxy, url string, timeout uint, interval uint,
if timeout == 0 {
timeout = 5000
}
ctx, cancel := context.WithCancel(context.Background())
return &HealthCheck{
ctx: ctx,
ctxCancel: cancel,
proxies: proxies,
url: url,
timeout: time.Duration(timeout) * time.Millisecond,
@@ -226,7 +230,6 @@ func NewHealthCheck(proxies []C.Proxy, url string, timeout uint, interval uint,
interval: time.Duration(interval) * time.Second,
lazy: lazy,
expectedStatus: expectedStatus,
done: make(chan struct{}, 1),
singleDo: singledo.NewSingle[struct{}](time.Second),
}
}

View File

@@ -14,23 +14,6 @@ type UpdatableProvider interface {
UpdatedAt() time.Time
}
func (pp *proxySetProvider) UpdatedAt() time.Time {
return pp.Fetcher.UpdatedAt
}
func (pp *proxySetProvider) Close() error {
pp.healthCheck.close()
pp.Fetcher.Destroy()
return nil
}
func (cp *compatibleProvider) Close() error {
cp.healthCheck.close()
return nil
}
func Suspend(s bool) {
suspended = s
}

View File

@@ -54,7 +54,7 @@ func (pp *proxySetProvider) MarshalJSON() ([]byte, error) {
"proxies": pp.Proxies(),
"testUrl": pp.healthCheck.url,
"expectedStatus": pp.healthCheck.expectedStatus.String(),
"updatedAt": pp.UpdatedAt,
"updatedAt": pp.UpdatedAt(),
"subscriptionInfo": pp.subscriptionInfo,
})
}
@@ -164,9 +164,9 @@ func (pp *proxySetProvider) closeAllConnections() {
})
}
func stopProxyProvider(pd *ProxySetProvider) {
pd.healthCheck.close()
_ = pd.Fetcher.Destroy()
func (pp *proxySetProvider) Close() error {
pp.healthCheck.close()
return pp.Fetcher.Close()
}
func NewProxySetProvider(name string, interval time.Duration, filter string, excludeFilter string, excludeType string, dialerProxy string, override OverrideSchema, vehicle types.Vehicle, hc *HealthCheck) (*ProxySetProvider, error) {
@@ -200,10 +200,15 @@ func NewProxySetProvider(name string, interval time.Duration, filter string, exc
fetcher := resource.NewFetcher[[]C.Proxy](name, interval, vehicle, proxiesParseAndFilter(filter, excludeFilter, excludeTypeArray, filterRegs, excludeFilterReg, dialerProxy, override), proxiesOnUpdate(pd))
pd.Fetcher = fetcher
wrapper := &ProxySetProvider{pd}
runtime.SetFinalizer(wrapper, stopProxyProvider)
runtime.SetFinalizer(wrapper, (*ProxySetProvider).Close)
return wrapper, nil
}
func (pp *ProxySetProvider) Close() error {
runtime.SetFinalizer(pp, nil)
return pp.proxySetProvider.Close()
}
// CompatibleProvider for auto gc
type CompatibleProvider struct {
*compatibleProvider
@@ -274,8 +279,9 @@ func (cp *compatibleProvider) RegisterHealthCheckTask(url string, expectedStatus
cp.healthCheck.registerHealthCheckTask(url, expectedStatus, filter, interval)
}
func stopCompatibleProvider(pd *CompatibleProvider) {
pd.healthCheck.close()
func (cp *compatibleProvider) Close() error {
cp.healthCheck.close()
return nil
}
func NewCompatibleProvider(name string, proxies []C.Proxy, hc *HealthCheck) (*CompatibleProvider, error) {
@@ -294,10 +300,15 @@ func NewCompatibleProvider(name string, proxies []C.Proxy, hc *HealthCheck) (*Co
}
wrapper := &CompatibleProvider{pd}
runtime.SetFinalizer(wrapper, stopCompatibleProvider)
runtime.SetFinalizer(wrapper, (*CompatibleProvider).Close)
return wrapper, nil
}
func (cp *CompatibleProvider) Close() error {
runtime.SetFinalizer(cp, nil)
return cp.compatibleProvider.Close()
}
func proxiesOnUpdate(pd *proxySetProvider) func([]C.Proxy) {
return func(elm []C.Proxy) {
pd.setProxies(elm)