pull/218/merge
Vinicius Tinti 6 years ago committed by GitHub
commit aa9be72c0c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 39
      bot.go
  2. 6
      bot_test.go
  3. 25
      types.go

@ -26,7 +26,6 @@ type BotAPI struct {
Self User `json:"-"` Self User `json:"-"`
Client *http.Client `json:"-"` Client *http.Client `json:"-"`
shutdownChannel chan interface{}
} }
// NewBotAPI creates a new BotAPI instance. // NewBotAPI creates a new BotAPI instance.
@ -45,7 +44,6 @@ func NewBotAPIWithClient(token string, client *http.Client) (*BotAPI, error) {
Token: token, Token: token,
Client: client, Client: client,
Buffer: 100, Buffer: 100,
shutdownChannel: make(chan interface{}),
} }
self, err := bot.GetMe() self, err := bot.GetMe()
@ -395,11 +393,12 @@ func (bot *BotAPI) GetWebhookInfo() (WebhookInfo, error) {
// GetUpdatesChan starts and returns a channel for getting updates. // GetUpdatesChan starts and returns a channel for getting updates.
func (bot *BotAPI) GetUpdatesChan(config UpdateConfig) UpdatesChannel { func (bot *BotAPI) GetUpdatesChan(config UpdateConfig) UpdatesChannel {
ch := make(chan Update, bot.Buffer) ch := make(chan Update, bot.Buffer)
done := make(chan struct{})
go func() { go func() {
for { for {
select { select {
case <-bot.shutdownChannel: case <-done:
return return
default: default:
} }
@ -416,28 +415,37 @@ func (bot *BotAPI) GetUpdatesChan(config UpdateConfig) UpdatesChannel {
for _, update := range updates { for _, update := range updates {
if update.UpdateID >= config.Offset { if update.UpdateID >= config.Offset {
config.Offset = update.UpdateID + 1 config.Offset = update.UpdateID + 1
ch <- update
select {
case ch <- update:
case <-done:
return
}
} }
} }
} }
}() }()
return ch updatesCh := UpdatesChannel{
} channel: ch,
done: done,
// StopReceivingUpdates stops the go routine which receives updates
func (bot *BotAPI) StopReceivingUpdates() {
if bot.Debug {
log.Println("Stopping the update receiver routine...")
} }
close(bot.shutdownChannel)
return updatesCh
} }
// ListenForWebhook registers a http handler for a webhook. // ListenForWebhook registers a http handler for a webhook.
func (bot *BotAPI) ListenForWebhook(pattern string) UpdatesChannel { func (bot *BotAPI) ListenForWebhook(pattern string) UpdatesChannel {
ch := make(chan Update, bot.Buffer) ch := make(chan Update, bot.Buffer)
done := make(chan struct{})
http.HandleFunc(pattern, func(w http.ResponseWriter, r *http.Request) { http.HandleFunc(pattern, func(w http.ResponseWriter, r *http.Request) {
select {
case <-done:
return
default:
}
bytes, _ := ioutil.ReadAll(r.Body) bytes, _ := ioutil.ReadAll(r.Body)
var update Update var update Update
@ -446,7 +454,12 @@ func (bot *BotAPI) ListenForWebhook(pattern string) UpdatesChannel {
ch <- update ch <- update
}) })
return ch updatesCh := UpdatesChannel{
channel: ch,
done: done,
}
return updatesCh
} }
// GetChat gets information about a chat. // GetChat gets information about a chat.

@ -552,7 +552,7 @@ func ExampleNewBotAPI() {
time.Sleep(time.Millisecond * 500) time.Sleep(time.Millisecond * 500)
updates.Clear() updates.Clear()
for update := range updates { for update := range updates.Channel() {
if update.Message == nil { if update.Message == nil {
continue continue
} }
@ -594,7 +594,7 @@ func ExampleNewWebhook() {
updates := bot.ListenForWebhook("/" + bot.Token) updates := bot.ListenForWebhook("/" + bot.Token)
go http.ListenAndServeTLS("0.0.0.0:8443", "cert.pem", "key.pem", nil) go http.ListenAndServeTLS("0.0.0.0:8443", "cert.pem", "key.pem", nil)
for update := range updates { for update := range updates.Channel() {
log.Printf("%+v\n", update) log.Printf("%+v\n", update)
} }
} }
@ -612,7 +612,7 @@ func ExampleInlineConfig() {
updates := bot.GetUpdatesChan(u) updates := bot.GetUpdatesChan(u)
for update := range updates { for update := range updates.Channel() {
if update.InlineQuery == nil { // if no inline query, ignore it if update.InlineQuery == nil { // if no inline query, ignore it
continue continue
} }

@ -40,13 +40,28 @@ type Update struct {
Poll *Poll `json:"poll"` Poll *Poll `json:"poll"`
} }
// UpdatesChannel is the channel for getting updates. // UpdatesChannel is the struct that holds a channel for getting updates.
type UpdatesChannel <-chan Update type UpdatesChannel struct {
channel chan Update
done chan struct{}
}
// Return Update channel
func (updatesCh UpdatesChannel) Channel() <-chan Update {
return updatesCh.channel
}
// Stop channel feeding by goroutine or http handlers.
//
// It may not feed the update channel with all fetched/received Updates.
func (updatesCh UpdatesChannel) Shutdown() {
updatesCh.done <- struct{}{}
}
// Clear discards all unprocessed incoming updates. // Clear discards all unprocessed incoming updates.
func (ch UpdatesChannel) Clear() { func (updatesCh UpdatesChannel) Clear() {
for len(ch) != 0 { for len(updatesCh.channel) != 0 {
<-ch <-updatesCh.channel
} }
} }