从生产者消费者问题看GO的并发机制

Go语言诞生已经两周年了。最近由于参加ECUG的缘故,我学习了一下Go语言的语法,就立刻被Go语言的特性所吸引。

Go语言一些背景就不介绍了,大家可以自行到Go主页进行查看(可能要准备梯子,和python.org一样悲催的命运)。Go的优点很多,最重要的特点之一是其第一个在语言层面对并发进行了支持。正如Go设计的主旨之一,就是现在的语言都没有对多核进行优化,对并发支持很不好。

Java,C#,Python这些语言对多线程编程有一样的模型。本文就生产者消费者问题对Java和Go进行对比。

生产者消费者模型大家应该很熟了,在操作系统的一些概念中经常提到,简单地说,就是生产者生产一些数据,然后放到buffer中,同时消费者从buffer中来取这些数据。这样就让生产消费变成了异步的两个过程。当buffer中没有数据时,消费者就进入等待过程;而当buffer中数据已满时,生产者则需要等待buffer中数据被取出后再写入。

Go语言中提出了channel的概念,它实际上就是一个管道,也可以理解为消息队列。Go线程(这里要注意,Go线程不同意一般的系统线程,而是类似于协程的概念,Go中称为goroutine)之间可以通过channel来进行通信,而channel本身是同步的。Go当中不鼓励通过加锁的方式来进行共享内存操作,而是鼓励通过管道通信来共享内存。我们来看具体的实现就明白了。

首先给出传统的Java中实现生产者消费者问题的代码。我们先写一个Mixin类,来进行对buffer中数据的取出和放入的操作。

public class Mixin {
	private int BUFFERSIZE = 0;
	private int[] Buffer;
	private int index = -1;
	
	public Mixin(int BufferSize) {
		this.BUFFERSIZE = BufferSize;
		this.Buffer = new int[this.BUFFERSIZE];
	}
	
	public synchronized void put(int item) {
		while(this.index >= this.BUFFERSIZE - 1) {
			try{
				wait();
			}
			catch(InterruptedException e) {
			}
		}
		this.Buffer[++this.index] = item;
		notifyAll();
	}
	
	public synchronized int get() {
		while(this.index < 0) {
			try{
				wait();
			}
			catch(InterruptedException e) { }
		}
		int result = this.Buffer[this.index--];
		notifyAll();
		return result;
	}
}

put和get操作都需要加上synchronized关键字来保证同步,代码很好理解就不讲了。接着,我们分别实现Producer和Consumer类,它们都继承了Thread类,并实现了run方法。

Producer类:

public class Producer extends Thread {
	private Mixin m = null;
	private int thisId;
	
	public Producer(Mixin m, int id) {
		this.m = m;
		this.thisId = id;
	}
	
	public synchronized void run() {
		for(int i=0; i<10; i++) {
			this.m.put(i);
			System.out.println("Producer " + this.thisId + " produces data: " + i);
			try {
				sleep(10);
			} catch (InterruptedException e) { }
		}
	}
}

Consumer类:

public class Consumer extends Thread {
	private Mixin m = null;
	private int thisId;
	
	public Consumer(Mixin m, int id) {
		this.m = m;
		this.thisId = id;
	}
	
	public synchronized void run() {
		for(int i=0; i<20; i++) {
			int item = this.m.get();
			System.out.println("Cosumer " + this.thisId + " get data: " + item);
			try {
				sleep(10);
			} catch (InterruptedException e) { }
		}
	}
}

Producer就是简单地把一些数放入buffer中。Java中通过wait方法来阻塞当前线程,而通过notifyAll来唤起所有阻塞的线程。最后我们写一个类来运行main函数,我们起了两个Producer线程和一个Cosumer线程。

public class ProducerConsumer {

	public static void main(String[] args) {
		Mixin m = new Mixin(6);
		Producer p1 = new Producer(m, 1);
		Producer p2 = new Producer(m, 2);
		Consumer c = new Consumer(m, 1);
		
		p1.start();
		p2.start();
		c.start();
	
	}

}

在Go中,这个问题变得异常简单。我们实现两个函数来分别表示Producer和Consumer,通过go关键字+函数名的方式,来启动goroutine。而由于channel的支持,我们可以在Producer和Consumer之间直接通过channel来进行通信。代码如下:

package main

import (
	"fmt"
	"time"
)

func Producer(id int, item chan int) {
	for i:=0; i<10; i++ {
		item <- i
		fmt.Printf("Producer %d produces data: %d\n", id, i)
		time.Sleep(10 * 1e6)
	}
}

func Consumer(id int, item chan int) {
	for i:=0; i<20; i++ {
		c_item := <-item
		fmt.Printf("Consumer %d get data: %d\n", id, c_item)
		time.Sleep(10 * 1e6)
	}
}

func main() {
	item := make(chan int, 6)
	go Producer(1, item)
	go Producer(2, item)
	go Consumer(1, item)
	
	time.Sleep(1 * 1e9)
}

channel的创建通过内建的make函数,第二个参数是channel通道的大小。通过item<-i,Producer把i放入item这个channel中,而Consumer通过<-item操作来从channel中取数据。当channel中的数据超过了其负荷值时,Producer这个goroutine就简单地阻塞,同样,当channel中没有数据时,Consumer也相应会阻塞。

可以看到,在语言级别上对并发的支持让Go中的多线程编程和异步编程变得很简单,也很易懂。

标签

赞这篇文章

分享到

13个评论

  1. youngterxyf

    你也去了啊,我也去了...
    上面的Producer和Consumer函数中你为什么要加个time.Sleep()函数调用呀?

  2. @秦续业 作者

    你也去ECUG了?
    加sleep就是为了调节生产和消费的速率的,可以让生产比较快啊等等,不过代码里设成一样的了

  3. @秦续业 作者

    恩恩,我也觉得是,看到xyf你的名了,呵呵,但是看上次留言的邮箱和这次不一样,所以没确定:)

  4. @秦续业 作者

    突然想起来了...到时候我去我同学那边,从那边过去。学校去实在太远了,反正大会上咱也能一起的...

  5. wolf

    很多语言都可以直接或间接的支持并发,但没几个语言可以做到“并行”,golang应该就是其中之一了

给作者留言

关于作者

残阳似血(@秦续业),程序猿一枚,把梦想揣进口袋的挨踢工作者。现加入阿里云,研究僧毕业于上海交通大学软件学院ADC实验室。熟悉分布式数据分析(DataFrame并行化框架)、基于图模型的分布式数据库和并行计算、Dpark/Spark以及Python web开发(Django、tornado)等。

博客分类

点击排行

标签云

扫描访问

主题

残阳似血的微博