Remove bot method StopReceivingUpdates()

The StopReceivingUpdates() was used to stop a goroutine created at
GetUpdatesChannel().

If StopReceivingUpdates() were called twice or if GetUpdatesChannel()
were called after a StopReceivingUpdates() the bot behavior would be
incorrect due to 'shutdownChannel' close.

This commit removes StopReceivingUpdates() and modifies UpdatesChannel
struct to have a Shutdown() method which can stop both the associated
goroutine and http handler to feed the updates channel.

Signed-off-by: Vinicius Tinti <viniciustinti@gmail.com>
pull/218/head
Vinicius Tinti 7 years ago
parent 5781187bc2
commit 87f38fc084
  1. 49
      bot.go
  2. 6
      bot_test.go
  3. 25
      types.go

@ -24,9 +24,8 @@ type BotAPI struct {
Debug bool `json:"debug"` Debug bool `json:"debug"`
Buffer int `json:"buffer"` Buffer int `json:"buffer"`
Self User `json:"-"` Self User `json:"-"`
Client *http.Client `json:"-"` Client *http.Client `json:"-"`
shutdownChannel chan interface{}
} }
// NewBotAPI creates a new BotAPI instance. // NewBotAPI creates a new BotAPI instance.
@ -42,10 +41,9 @@ func NewBotAPI(token string) (*BotAPI, error) {
// It requires a token, provided by @BotFather on Telegram. // It requires a token, provided by @BotFather on Telegram.
func NewBotAPIWithClient(token string, client *http.Client) (*BotAPI, error) { func NewBotAPIWithClient(token string, client *http.Client) (*BotAPI, error) {
bot := &BotAPI{ bot := &BotAPI{
Token: token, Token: token,
Client: client, Client: client,
Buffer: 100, Buffer: 100,
shutdownChannel: make(chan interface{}),
} }
self, err := bot.GetMe() self, err := bot.GetMe()
@ -380,11 +378,12 @@ func (bot *BotAPI) GetWebhookInfo() (WebhookInfo, error) {
// GetUpdatesChan starts and returns a channel for getting updates. // GetUpdatesChan starts and returns a channel for getting updates.
func (bot *BotAPI) GetUpdatesChan(config UpdateConfig) UpdatesChannel { func (bot *BotAPI) GetUpdatesChan(config UpdateConfig) UpdatesChannel {
ch := make(chan Update, bot.Buffer) ch := make(chan Update, bot.Buffer)
done := make(chan struct{})
go func() { go func() {
for { for {
select { select {
case <-bot.shutdownChannel: case <-done:
return return
default: default:
} }
@ -401,28 +400,37 @@ func (bot *BotAPI) GetUpdatesChan(config UpdateConfig) UpdatesChannel {
for _, update := range updates { for _, update := range updates {
if update.UpdateID >= config.Offset { if update.UpdateID >= config.Offset {
config.Offset = update.UpdateID + 1 config.Offset = update.UpdateID + 1
ch <- update
select {
case ch <- update:
case <-done:
return
}
} }
} }
} }
}() }()
return ch updatesCh := UpdatesChannel{
} channel: ch,
done: done,
// StopReceivingUpdates stops the go routine which receives updates
func (bot *BotAPI) StopReceivingUpdates() {
if bot.Debug {
log.Println("Stopping the update receiver routine...")
} }
close(bot.shutdownChannel)
return updatesCh
} }
// ListenForWebhook registers a http handler for a webhook. // ListenForWebhook registers a http handler for a webhook.
func (bot *BotAPI) ListenForWebhook(pattern string) UpdatesChannel { func (bot *BotAPI) ListenForWebhook(pattern string) UpdatesChannel {
ch := make(chan Update, bot.Buffer) ch := make(chan Update, bot.Buffer)
done := make(chan struct{})
http.HandleFunc(pattern, func(w http.ResponseWriter, r *http.Request) { http.HandleFunc(pattern, func(w http.ResponseWriter, r *http.Request) {
select {
case <-done:
return
default:
}
bytes, _ := ioutil.ReadAll(r.Body) bytes, _ := ioutil.ReadAll(r.Body)
var update Update var update Update
@ -431,7 +439,12 @@ func (bot *BotAPI) ListenForWebhook(pattern string) UpdatesChannel {
ch <- update ch <- update
}) })
return ch updatesCh := UpdatesChannel{
channel: ch,
done: done,
}
return updatesCh
} }
// GetChat gets information about a chat. // GetChat gets information about a chat.

@ -543,7 +543,7 @@ func ExampleNewBotAPI() {
time.Sleep(time.Millisecond * 500) time.Sleep(time.Millisecond * 500)
updates.Clear() updates.Clear()
for update := range updates { for update := range updates.Channel() {
if update.Message == nil { if update.Message == nil {
continue continue
} }
@ -585,7 +585,7 @@ func ExampleNewWebhook() {
updates := bot.ListenForWebhook("/" + bot.Token) updates := bot.ListenForWebhook("/" + bot.Token)
go http.ListenAndServeTLS("0.0.0.0:8443", "cert.pem", "key.pem", nil) go http.ListenAndServeTLS("0.0.0.0:8443", "cert.pem", "key.pem", nil)
for update := range updates { for update := range updates.Channel() {
log.Printf("%+v\n", update) log.Printf("%+v\n", update)
} }
} }
@ -603,7 +603,7 @@ func ExampleInlineConfig() {
updates := bot.GetUpdatesChan(u) updates := bot.GetUpdatesChan(u)
for update := range updates { for update := range updates.Channel() {
if update.InlineQuery == nil { // if no inline query, ignore it if update.InlineQuery == nil { // if no inline query, ignore it
continue continue
} }

@ -39,13 +39,28 @@ type Update struct {
PreCheckoutQuery *PreCheckoutQuery `json:"pre_checkout_query"` PreCheckoutQuery *PreCheckoutQuery `json:"pre_checkout_query"`
} }
// UpdatesChannel is the channel for getting updates. // UpdatesChannel is the struct that holds a channel for getting updates.
type UpdatesChannel <-chan Update type UpdatesChannel struct {
channel chan Update
done chan struct{}
}
// Return Update channel
func (updatesCh UpdatesChannel) Channel() <-chan Update {
return updatesCh.channel
}
// Stop channel feeding by goroutine or http handlers.
//
// It may not feed the update channel with all fetched/received Updates.
func (updatesCh UpdatesChannel) Shutdown() {
updatesCh.done <- struct{}{}
}
// Clear discards all unprocessed incoming updates. // Clear discards all unprocessed incoming updates.
func (ch UpdatesChannel) Clear() { func (updatesCh UpdatesChannel) Clear() {
for len(ch) != 0 { for len(updatesCh.channel) != 0 {
<-ch <-updatesCh.channel
} }
} }